wework_api/WXBizMsgCrypt.py
2025-02-21 08:04:44 +08:00

284 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
#-*- encoding:utf-8 -*-
""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc.
"""
# ------------------------------------------------------------------------
import base64
import string
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import sys
import socket
import ierror
"""
关于Crypto.Cipher模块ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后按照README中的“Installation”小节的提示进行pycrypto安装。
"""
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)
class SHA1:
def getSHA1(self, token, timestamp, nonce, encrypt):
try:
# 确保所有输入都是字符串类型
token = str(token)
timestamp = str(timestamp)
nonce = str(nonce)
encrypt = str(encrypt)
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
# 将列表转换为字符串并编码
str_to_hash = "".join(sortlist).encode('utf-8')
sha = hashlib.sha1()
sha.update(str_to_hash)
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
except Exception as e:
print(f"[ERROR] SHA1计算失败: {str(e)}")
return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
class XMLParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""
# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%s]]></Encrypt>
<MsgSignature><![CDATA[%s]]></MsgSignature>
<TimeStamp>%s</TimeStamp>
<Nonce><![CDATA[%s]]></Nonce>
</xml>"""
def extract(self, xmltext):
"""提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
return ierror.WXBizMsgCrypt_OK, encrypt.text
except Exception as e:
print (e)
return ierror.WXBizMsgCrypt_ParseXml_Error,None
def generate(self, encrypt, signature, timestamp, nonce):
"""生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的xml字符串
"""
return self.AES_TEXT_RESPONSE_TEMPLATE % (
encrypt,
signature,
timestamp,
nonce
)
class PKCS7Encoder():
"""提供基于PKCS7算法的加解密接口"""
block_size = 32
def encode(self, text):
if isinstance(text, str):
text = text.encode('utf-8')
# 计算需要填充的位数
amount_to_pad = self.block_size - (len(text) % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 填充
pad_chr = chr(amount_to_pad).encode('utf-8')
padding = pad_chr * amount_to_pad
return text + padding
def decode(self, decrypted):
"""删除解密后明文的补位字符
@param decrypted: 解密后的明文(bytes类型)
@return: 删除补位字符后的明文
"""
if isinstance(decrypted, bytes):
pad = decrypted[-1]
else:
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]
class Prpcrypt(object):
"""提供接收和推送给企业微信消息的加解密接口"""
def __init__(self,key):
#self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
self.mode = AES.MODE_CBC
def encrypt(self, text, receiveid):
try:
# 统一处理输入为bytes
text = text.encode('utf-8') if isinstance(text, str) else text
receiveid = receiveid.encode('utf-8') if isinstance(receiveid, str) else receiveid
# 生成16位随机字符串
random_str = self.get_random_str().encode('utf-8')
# 打包文本长度
text_len = struct.pack("I", socket.htonl(len(text)))
# 拼接内容
content = b''.join([random_str, text_len, text, receiveid])
# PKCS7填充
pkcs7 = PKCS7Encoder()
padding_text = pkcs7.encode(content)
# AES加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
ciphertext = cryptor.encrypt(padding_text)
# Base64编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception as e:
print(f"[ERROR] 加密失败: {str(e)}")
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
def decrypt(self,text,receiveid):
try:
cryptor = AES.new(self.key,self.mode,self.key[:16])
plain_text = cryptor.decrypt(base64.b64decode(text))
# 获取补位值
pad = plain_text[-1]
# 去除补位字符
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
xml_content = content[4 : xml_len+4]
from_receiveid = content[xml_len+4:]
except Exception as e:
print(f"[ERROR] 解密失败: {str(e)}")
return ierror.WXBizMsgCrypt_IllegalBuffer,None
if from_receiveid != receiveid.encode('utf-8'):
return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
return 0,xml_content.decode('utf-8')
def get_random_str(self):
""" 随机生成16位字符串
@return: 16位字符串
"""
rule = string.ascii_letters + string.digits # 修改这里
str = random.sample(rule, 16)
return "".join(str)
class WXBizMsgCrypt(object):
#构造函数
def __init__(self,sToken,sEncodingAESKey,sReceiveId):
try:
self.key = base64.b64decode(sEncodingAESKey+"=")
assert len(self.key) == 32
except:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.WXBizMsgCrypt_IllegalAesKey,None
self.m_sToken = sToken
self.m_sReceiveId = sReceiveId
#验证URL
#@param sMsgSignature: 签名串对应URL参数的msg_signature
#@param sTimeStamp: 时间戳对应URL参数的timestamp
#@param sNonce: 随机串对应URL参数的nonce
#@param sEchoStr: 随机串对应URL参数的echostr
#@param sReplyEchoStr: 解密之后的echostr当return返回0时有效
#@return成功0失败返回对应的错误码
def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.m_sReceiveId)
return ret,sReplyEchoStr
def EncryptMsg(self, sReplyMsg, sNonce, timestamp = None):
#将企业回复用户的消息加密打包
#@param sReplyMsg: 企业号待回复用户的消息xml格式的字符串
#@param sTimeStamp: 时间戳可以自己生成也可以用URL参数的timestamp,如为None则自动用当前时间
#@param sNonce: 随机串可以自己生成也可以用URL参数的nonce
#sEncryptMsg: 加密后的可以直接回复用户的密文包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
#return成功0sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret,encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
if ret != 0:
return ret,None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全签名
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt.decode('utf-8'))
if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, xmlParse.generate(encrypt.decode('utf-8'), signature, timestamp, sNonce)
def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明文
# @param sMsgSignature: 签名串对应URL参数的msg_signature
# @param sTimeStamp: 时间戳对应URL参数的timestamp
# @param sNonce: 随机串对应URL参数的nonce
# @param sPostData: 密文对应POST请求的数据
# xml_content: 解密后的原文当return返回0时有效
# @return: 成功0失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
ret,encrypt = xmlParse.extract(sPostData)
if ret != 0:
return ret, None
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret,xml_content = pc.decrypt(encrypt,self.m_sReceiveId)
return ret,xml_content