From 903e3495e6d0dc736980c4d1734b9790d497ac0a Mon Sep 17 00:00:00 2001 From: glowz <24327181@qq.com> Date: Fri, 21 Feb 2025 08:04:44 +0800 Subject: [PATCH] first commit --- README.md | 0 WXBizMsgCrypt.py | 283 +++++++++++++++++++++++++++++ ai_service.py | 452 ++++++++++++++++++++++++++++++++++++++++++++++ callback1.py | 219 ++++++++++++++++++++++ ierror.py | 20 ++ keyword_config.py | 23 +++ 6 files changed, 997 insertions(+) create mode 100644 README.md create mode 100644 WXBizMsgCrypt.py create mode 100644 ai_service.py create mode 100644 callback1.py create mode 100644 ierror.py create mode 100644 keyword_config.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/WXBizMsgCrypt.py b/WXBizMsgCrypt.py new file mode 100644 index 0000000..1943b72 --- /dev/null +++ b/WXBizMsgCrypt.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python +#-*- encoding:utf-8 -*- + +""" 对企业微信发送给企业后台的消息加解密示例代码. +@copyright: Copyright (c) 1998-2014 Tencent Inc. + +""" +# ------------------------------------------------------------------------ + +import base64 +import string +import random +import hashlib +import time +import struct +from Crypto.Cipher import AES +import xml.etree.cElementTree as ET +import sys +import socket +import ierror + +""" + + + +关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案 +请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。 +下载后,按照README中的“Installation”小节的提示进行pycrypto安装。 +""" +class FormatException(Exception): + pass + +def throw_exception(message, exception_class=FormatException): + """my define raise exception function""" + raise exception_class(message) + +class SHA1: + def getSHA1(self, token, timestamp, nonce, encrypt): + try: + # 确保所有输入都是字符串类型 + token = str(token) + timestamp = str(timestamp) + nonce = str(nonce) + encrypt = str(encrypt) + + sortlist = [token, timestamp, nonce, encrypt] + sortlist.sort() + + # 将列表转换为字符串并编码 + str_to_hash = "".join(sortlist).encode('utf-8') + + sha = hashlib.sha1() + sha.update(str_to_hash) + return ierror.WXBizMsgCrypt_OK, sha.hexdigest() + except Exception as e: + print(f"[ERROR] SHA1计算失败: {str(e)}") + return ierror.WXBizMsgCrypt_ComputeSignature_Error, None + + +class XMLParse: + """提供提取消息格式中的密文及生成回复消息格式的接口""" + + # xml消息模板 + AES_TEXT_RESPONSE_TEMPLATE = """ + + +%s + +""" + + def extract(self, xmltext): + """提取出xml数据包中的加密消息 + @param xmltext: 待提取的xml字符串 + @return: 提取出的加密消息字符串 + """ + try: + xml_tree = ET.fromstring(xmltext) + encrypt = xml_tree.find("Encrypt") + return ierror.WXBizMsgCrypt_OK, encrypt.text + except Exception as e: + print (e) + return ierror.WXBizMsgCrypt_ParseXml_Error,None + + def generate(self, encrypt, signature, timestamp, nonce): + """生成xml消息 + @param encrypt: 加密后的消息密文 + @param signature: 安全签名 + @param timestamp: 时间戳 + @param nonce: 随机字符串 + @return: 生成的xml字符串 + """ + return self.AES_TEXT_RESPONSE_TEMPLATE % ( + encrypt, + signature, + timestamp, + nonce + ) + + +class PKCS7Encoder(): + """提供基于PKCS7算法的加解密接口""" + + block_size = 32 + def encode(self, text): + if isinstance(text, str): + text = text.encode('utf-8') + + # 计算需要填充的位数 + amount_to_pad = self.block_size - (len(text) % self.block_size) + if amount_to_pad == 0: + amount_to_pad = self.block_size + + # 填充 + pad_chr = chr(amount_to_pad).encode('utf-8') + padding = pad_chr * amount_to_pad + + return text + padding + + def decode(self, decrypted): + """删除解密后明文的补位字符 + @param decrypted: 解密后的明文(bytes类型) + @return: 删除补位字符后的明文 + """ + if isinstance(decrypted, bytes): + pad = decrypted[-1] + else: + pad = ord(decrypted[-1]) + + if pad < 1 or pad > 32: + pad = 0 + + return decrypted[:-pad] + + +class Prpcrypt(object): + """提供接收和推送给企业微信消息的加解密接口""" + + def __init__(self,key): + + #self.key = base64.b64decode(key+"=") + self.key = key + # 设置加解密模式为AES的CBC模式 + self.mode = AES.MODE_CBC + + + def encrypt(self, text, receiveid): + try: + # 统一处理输入为bytes + text = text.encode('utf-8') if isinstance(text, str) else text + receiveid = receiveid.encode('utf-8') if isinstance(receiveid, str) else receiveid + + # 生成16位随机字符串 + random_str = self.get_random_str().encode('utf-8') + + # 打包文本长度 + text_len = struct.pack("I", socket.htonl(len(text))) + + # 拼接内容 + content = b''.join([random_str, text_len, text, receiveid]) + + # PKCS7填充 + pkcs7 = PKCS7Encoder() + padding_text = pkcs7.encode(content) + + # AES加密 + cryptor = AES.new(self.key, self.mode, self.key[:16]) + ciphertext = cryptor.encrypt(padding_text) + + # Base64编码 + return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext) + + except Exception as e: + print(f"[ERROR] 加密失败: {str(e)}") + return ierror.WXBizMsgCrypt_EncryptAES_Error, None + + def decrypt(self,text,receiveid): + try: + cryptor = AES.new(self.key,self.mode,self.key[:16]) + plain_text = cryptor.decrypt(base64.b64decode(text)) + + # 获取补位值 + pad = plain_text[-1] + + # 去除补位字符 + content = plain_text[16:-pad] + + xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0]) + xml_content = content[4 : xml_len+4] + from_receiveid = content[xml_len+4:] + except Exception as e: + print(f"[ERROR] 解密失败: {str(e)}") + return ierror.WXBizMsgCrypt_IllegalBuffer,None + + if from_receiveid != receiveid.encode('utf-8'): + return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None + + return 0,xml_content.decode('utf-8') + + def get_random_str(self): + """ 随机生成16位字符串 + @return: 16位字符串 + """ + rule = string.ascii_letters + string.digits # 修改这里 + str = random.sample(rule, 16) + return "".join(str) + +class WXBizMsgCrypt(object): + #构造函数 + def __init__(self,sToken,sEncodingAESKey,sReceiveId): + try: + self.key = base64.b64decode(sEncodingAESKey+"=") + assert len(self.key) == 32 + except: + throw_exception("[error]: EncodingAESKey unvalid !", FormatException) + # return ierror.WXBizMsgCrypt_IllegalAesKey,None + self.m_sToken = sToken + self.m_sReceiveId = sReceiveId + + #验证URL + #@param sMsgSignature: 签名串,对应URL参数的msg_signature + #@param sTimeStamp: 时间戳,对应URL参数的timestamp + #@param sNonce: 随机串,对应URL参数的nonce + #@param sEchoStr: 随机串,对应URL参数的echostr + #@param sReplyEchoStr: 解密之后的echostr,当return返回0时有效 + #@return:成功0,失败返回对应的错误码 + + def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr): + sha1 = SHA1() + ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr) + if ret != 0: + return ret, None + if not signature == sMsgSignature: + return ierror.WXBizMsgCrypt_ValidateSignature_Error, None + pc = Prpcrypt(self.key) + ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.m_sReceiveId) + return ret,sReplyEchoStr + + def EncryptMsg(self, sReplyMsg, sNonce, timestamp = None): + #将企业回复用户的消息加密打包 + #@param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串 + #@param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间 + #@param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce + #sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串, + #return:成功0,sEncryptMsg,失败返回对应的错误码None + pc = Prpcrypt(self.key) + ret,encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId) + if ret != 0: + return ret,None + if timestamp is None: + timestamp = str(int(time.time())) + # 生成安全签名 + sha1 = SHA1() + ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt.decode('utf-8')) + if ret != 0: + return ret, None + + xmlParse = XMLParse() + return ret, xmlParse.generate(encrypt.decode('utf-8'), signature, timestamp, sNonce) + + def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce): + # 检验消息的真实性,并且获取解密后的明文 + # @param sMsgSignature: 签名串,对应URL参数的msg_signature + # @param sTimeStamp: 时间戳,对应URL参数的timestamp + # @param sNonce: 随机串,对应URL参数的nonce + # @param sPostData: 密文,对应POST请求的数据 + # xml_content: 解密后的原文,当return返回0时有效 + # @return: 成功0,失败返回对应的错误码 + # 验证安全签名 + xmlParse = XMLParse() + ret,encrypt = xmlParse.extract(sPostData) + if ret != 0: + return ret, None + sha1 = SHA1() + ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt) + if ret != 0: + return ret, None + if not signature == sMsgSignature: + return ierror.WXBizMsgCrypt_ValidateSignature_Error, None + pc = Prpcrypt(self.key) + ret,xml_content = pc.decrypt(encrypt,self.m_sReceiveId) + return ret,xml_content + + diff --git a/ai_service.py b/ai_service.py new file mode 100644 index 0000000..40df5e7 --- /dev/null +++ b/ai_service.py @@ -0,0 +1,452 @@ +# ai_service.py 优化版本 +import requests +import logging +from typing import Dict, Optional +from functools import lru_cache +from config import OLLAMA_MODEL, OPENAI_API_KEY, OPENAI_MODEL,OPENAI_BASE_URL # 确保config.py中有这些配置 +import time +# 配置日志 +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +class AIService: + """AI服务抽象基类""" + def generate_response(self, prompt: str) -> str: + """ + 生成AI回复 + :param prompt: 用户输入的提示文本 + :return: 生成的回复文本 + """ + raise NotImplementedError + + +class OllamaService(AIService): + """Ollama本地模型服务实现""" + def __init__( + self, + endpoint: str = "http://localhost:11434/api/generate", + model: str = OLLAMA_MODEL, + timeout: int = 10 + ): + self.endpoint = endpoint + self.default_model = model + self.timeout = timeout + + @lru_cache(maxsize=100) + def generate_response(self, prompt: str) -> str: + try: + response = requests.post( + self.endpoint, + json={ + 'model': self.default_model, + 'prompt': prompt, + 'stream': False + }, + timeout=self.timeout + ) + response.raise_for_status() + + result = response.json() + return result.get('response', '收到您的消息') + + except requests.exceptions.ConnectionError: + logger.error("无法连接Ollama服务,请检查服务状态") + return "本地模型服务未启动" + + except requests.exceptions.Timeout: + logger.warning("Ollama请求超时") + return "响应超时,请简化问题" + + except Exception as e: + logger.error(f"Ollama处理异常: {str(e)}", exc_info=True) + return "本地模型服务异常" + + + +class DifyService(AIService): + """Dify API客户端封装""" + + def __init__( + self, + api_key: str, + base_url: str = "http://localhost/v1", + timeout: int = 100, + default_user: str = "system" + ): + """ + :param api_key: 应用API密钥 + :param base_url: API基础地址 (默认: http://localhost/v1) + :param timeout: 请求超时时间 (秒) + :param default_user: 默认用户标识 + """ + self._validate_config(api_key, base_url) + + self.api_key = api_key + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.default_user = default_user + self.logger = logging.getLogger(self.__class__.__name__) + + self.session = requests.Session() + self.headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + + def _validate_config(self, api_key: str, base_url: str): + """配置校验""" + if not api_key.startswith('app-'): + raise ValueError("Invalid API key format") + if not base_url.startswith(('http://', 'https://')): + raise ValueError("Invalid base URL protocol") + + @lru_cache(maxsize=100) + def generate_response( + self, + query: str, + response_mode: str = "blocking", + conversation_id: Optional[str] = None, + user: Optional[str] = None, + **additional_inputs + ) -> str: + """ + 生成对话响应 + :param query: 用户查询内容 + :param response_mode: 响应模式 (blocking/streaming) + :param conversation_id: 会话ID (为空时创建新会话) + :param user: 用户标识 (默认使用初始化参数""" + + try: + response = requests.post( + f"{self.base_url}/chat-messages", + headers=self.headers, + json={ + "inputs": {}, + "query": query, + "response_mode": "blocking", + "conversation_id": "", + "user": "abc-123" + }, + timeout=self.timeout + ) + response.raise_for_status() + + #response.json()["answer"] + return response.json()["answer"] + + except requests.exceptions.ConnectionError: + logger.error("无法连接dify服务,请检查服务状态") + return "本地模型服务未启动" + + except requests.exceptions.Timeout: + logger.warning("dify请求超时") + return "响应超时,请简化问题" + + except Exception as e: + logger.error(f"dify处理异常: {str(e)}", exc_info=True) + return "本地模型服务异常" + + +class OpenAIService(AIService): + """OpenAI官方接口服务实现""" + def __init__( + self, + api_key: str = OPENAI_API_KEY, + model: str = OPENAI_MODEL, + base_url: str = OPENAI_BASE_URL, + timeout: int = 15, + temperature: float = 0.7, + max_conversation_length: int = 10, + max_time_gap: int = 30 + ): + self._validate_config(api_key, model) + + self.api_key = api_key + self.default_model = model + self.base_url = base_url + self.timeout = timeout + self.temperature = temperature + self.headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + # 新增会话管理相关属性 + self.conversation_history = {} + self.max_conversation_length = max_conversation_length + self.max_time_gap = max_time_gap + self.system_prompt = '''你是路桥设计院智能助手。你的使命是尽可能地用详尽的、温暖的、友善的话语帮助企业员工,在各种方面提供帮助和支持。无论我需要什么帮助或建议,你都会尽力提供详尽信息。''' + + def _validate_config(self, api_key: str, model: str) -> None: + """ + 验证OpenAI配置参数 + :param api_key: OpenAI API密钥 + :param model: 模型名称 + :raises ValueError: 当配置参数无效时抛出 + """ + if not api_key: + raise ValueError("OpenAI API密钥不能为空") + + if not model: + raise ValueError("模型名称不能为空") + + if not isinstance(api_key, str) or not isinstance(model, str): + raise ValueError("API密钥和模型名称必须是字符串类型") + + # 可选:验证API密钥格式 + # if not api_key.startswith('sk-'): + # raise ValueError("无效的OpenAI API密钥格式") + + def _manage_conversation_history(self, user_id: str, message: str): + """管理会话历史""" + current_timestamp = int(time.time()) + + # 检查会话是否超时 + if (user_id in self.conversation_history and + current_timestamp - self.conversation_history[user_id]["last_timestamp"] >= self.max_time_gap * 60): + del self.conversation_history[user_id] + + # 初始化或更新会话历史 + if user_id not in self.conversation_history: + self.conversation_history[user_id] = { + "messages": [], + "last_timestamp": current_timestamp + } + else: + self.conversation_history[user_id]["last_timestamp"] = current_timestamp + + # 限制会话历史长度 + if len(self.conversation_history[user_id]["messages"]) > self.max_conversation_length: + self.conversation_history[user_id]["messages"] = ( + self.conversation_history[user_id]["messages"][-self.max_conversation_length:] + ) + + # 添加新消息 + self.conversation_history[user_id]["messages"].append({ + "role": "user", + "content": message + }) + + def generate_response(self, prompt: str, user_id: str = "default_user") -> str: + """ + 生成带有会话历史的回复 + :param prompt: 用户输入的提示文本 + :param user_id: 用户标识符 + :return: 生成的回复文本 + """ + try: + self._manage_conversation_history(user_id, prompt) + + # 构建完整的消息历史 + messages = [{"role": "system", "content": self.system_prompt}] + messages.extend(self.conversation_history[user_id]["messages"]) + + response = requests.post( + f"{self.base_url}/chat/completions", + headers=self.headers, + json={ + "model": self.default_model, + "messages": messages, + "temperature": self.temperature + }, + timeout=self.timeout + ) + response.raise_for_status() + + result = response.json() + if 'choices' not in result: + logger.error(f"OpenAI响应格式异常: {result}") + return "响应解析失败" + + response_text = result['choices'][0]['message']['content'] + + # 保存助手的回复到会话历史 + self.conversation_history[user_id]["messages"].append({ + "role": "assistant", + "content": response_text + }) + + return response_text + + except Exception as e: + logger.error(f"OpenAI处理异常: {str(e)}", exc_info=True) + return "服务暂时不可用" + +class FastGptService(AIService): + """FastGPT API客户端封装""" + + def __init__( + self, + api_key: str, + base_url: str = "http://localhost:3000/api/v1", + timeout: int = 30, + max_conversation_length: int = 10, + max_time_gap: int = 30 + ): + """ + 初始化FastGPT服务 + :param api_key: FastGPT API密钥 + :param base_url: API基础地址 + :param timeout: 请求超时时间(秒) + :param max_conversation_length: 最大会话长度 + :param max_time_gap: 会话超时时间(分钟) + """ + self._validate_config(api_key, base_url) + + self.api_key = api_key + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + # 会话管理相关属性 + self.conversation_history = {} + self.max_conversation_length = max_conversation_length + self.max_time_gap = max_time_gap + + def _validate_config(self, api_key: str, base_url: str): + """配置校验""" + if not api_key: + raise ValueError("FastGPT API密钥不能为空") + if not base_url.startswith(('http://', 'https://')): + raise ValueError("无效的基础URL协议") + + def _manage_conversation_history(self, user_id: str, message: str): + """管理会话历史""" + current_timestamp = int(time.time()) + + # 检查会话是否超时 + if (user_id in self.conversation_history and + current_timestamp - self.conversation_history[user_id]["last_timestamp"] >= self.max_time_gap * 60): + del self.conversation_history[user_id] + + # 初始化或更新会话历史 + if user_id not in self.conversation_history: + self.conversation_history[user_id] = { + "chat_id": f"chat_{user_id}_{current_timestamp}", + "messages": [], + "last_timestamp": current_timestamp + } + else: + self.conversation_history[user_id]["last_timestamp"] = current_timestamp + + # 限制会话历史长度 + if len(self.conversation_history[user_id]["messages"]) >= self.max_conversation_length: + self.conversation_history[user_id]["messages"] = ( + self.conversation_history[user_id]["messages"][-self.max_conversation_length:] + ) + + # 添加新消息 + msg_id = f"msg_{current_timestamp}" + self.conversation_history[user_id]["messages"].append({ + "role": "user", + "content": message + }) + return msg_id + + def generate_response( + self, + prompt: str, + user_id: str = "default_user", + variables: dict = None, + detail: bool = False + ) -> str: + """ + 生成回复 + :param prompt: 用户输入 + :param user_id: 用户标识 + :param variables: 模块变量 + :param detail: 是否返回详细信息 + :return: 生成的回复文本 + """ + try: + msg_id = self._manage_conversation_history(user_id, prompt) + chat_info = self.conversation_history.get(user_id, {}) + + payload = { + "chatId": chat_info.get("chat_id"), + "stream": False, + "detail": detail, + "responseChatItemId": msg_id, + "variables": variables or {}, + "messages": [{"role": "user", "content": prompt}] + } + + response = requests.post( + f"{self.base_url}/chat/completions", + headers=self.headers, + json=payload, + timeout=self.timeout + ) + response.raise_for_status() + + result = response.json() + + # 处理响应 + if detail: + response_text = result.get("responseData", {}).get("content", "响应解析失败") + else: + response_text = result.get("content", "响应解析失败") + + # 保存助手回复到会话历史 + if user_id in self.conversation_history: + self.conversation_history[user_id]["messages"].append({ + "role": "assistant", + "content": response_text + }) + + return response_text + + except requests.exceptions.ConnectionError: + logger.error("无法连接FastGPT服务") + return "服务连接失败" + + except requests.exceptions.Timeout: + logger.warning("FastGPT请求超时") + return "响应超时" + + except Exception as e: + logger.error(f"FastGPT处理异常: {str(e)}", exc_info=True) + return "服务暂时不可用" + +class HybridAIService(AIService): + """混合AI服务(故障转移模式)""" + def __init__(self, services: list[AIService]): + self.services = services + + def generate_response(self, prompt: str) -> str: + for service in self.services: + try: + return service.generate_response(prompt) + except Exception as e: + logger.warning(f"{type(service).__name__} 服务失败: {str(e)}") + continue + return "所有AI服务不可用" + +class MessageHandler: + """智能消息处理器""" + def __init__(self, keyword_config: Dict, ai_service: AIService): + """ + :param keyword_config: 关键词配置字典 + :param ai_service: AI服务实例 + """ + self.keyword_config = keyword_config + self.ai_service = ai_service + + def get_reply(self, content: str) -> str: + # 优先全匹配关键词 + for rule in self.keyword_config.values(): + if any(kw == content.strip() for kw in rule['keywords']): + return rule['reply'] + + # 其次模糊匹配 + for rule in self.keyword_config.values(): + if any(kw in content for kw in rule['keywords']): + return rule['reply'] + + # 无匹配时调用AI + return self.ai_service.generate_response(content) \ No newline at end of file diff --git a/callback1.py b/callback1.py new file mode 100644 index 0000000..1d8b4d6 --- /dev/null +++ b/callback1.py @@ -0,0 +1,219 @@ +from config import AgentId,Secret,corpid,token,encodingAESKey +#import xml.etree.ElementTree as ET +from flask import Flask,request +from WXBizMsgCrypt import WXBizMsgCrypt +import base64,hashlib +from Crypto.Cipher import AES +import time +from xml.etree import ElementTree +from keyword_config import KEYWORD_REPLIES +from ai_service import OllamaService,MessageHandler,DifyService +from typing import Dict +from config import OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL,DIFY_API_KEY,DIFY_BASE_URL +from config import FASTAPI_BASE_URL, FASTAPI_API_KEY +import re + +from ai_service import OpenAIService,FastGptService +app =Flask(__name__) +wxcpt = WXBizMsgCrypt(token,encodingAESKey,corpid) +#ai_service = OllamaService() +ai_service = OpenAIService(OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL) +#ai_service = DifyService(DIFY_API_KEY,DIFY_BASE_URL) +#ai_service = FastGptService(FASTAPI_API_KEY, FASTAPI_BASE_URL) + +# 检查base64编码后数据位数是否正确 +def check_base64_len(base64_str): + len_remainder = 4 - (len(base64_str) % 4) + if len_remainder == 0: + return base64_str + else: + for temp in range(0,len_remainder): + base64_str = base64_str + "=" + return base64_str +# 解密并提取消息正文 +def msg_base64_decrypt(ciphertext_base64,key_base64): + # 处理密文、密钥和iv + ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext_base64)) + key_bytes = base64.b64decode(check_base64_len(key_base64)) + iv_bytes = key_bytes[:16] + + # 解密 + decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes) + plaintext_bytes = decr.decrypt(ciphertext_bytes) + + # 截取数据,判断消息正文字节数 + msg_len_bytes = plaintext_bytes[16:20] + msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False) + + # 根据消息正文字节数截取消息正文,并转为字符串格式 + msg_bytes = plaintext_bytes[20:20+msg_len] + msg = str(msg_bytes,encoding='utf-8') + + return msg +# 消息体签名校验 +def check_msg_signature(msg_signature,token,timestamp,nonce,echostr): + # 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值] + li = [token,timestamp,nonce,echostr] + li.sort() + # 将排序结果拼接 + li_str = li[0]+li[1]+li[2]+li[3] + + # 计算SHA-1值 + sha1 = hashlib.sha1() + # update()要指定加密字符串字符代码,不然要报错: + # "Unicode-objects must be encoded before hashing" + sha1.update(li_str.encode("utf8")) + sha1_result = sha1.hexdigest() + + # 比较并返回比较结果 + if sha1_result == msg_signature: + return True + else: + return False +@app.route('/hello', methods=['GET']) +def hello(): + return "Hello, Flask!" + +@app.route('/', methods=['GET', 'POST']) +def reply(): + try: + # 处理GET请求(验证URL)部分保持不变 + if request.method == 'GET': + msg_signature = request.args.to_dict().get("msg_signature") + timestamp = request.args.to_dict().get("timestamp") + nonce = request.args.to_dict().get("nonce") + echostr = request.args.to_dict().get("echostr") + print(msg_signature,timestamp,nonce,echostr) + # 获取消息体签名校验结果 + check_result = check_msg_signature(msg_signature,token,timestamp,nonce,echostr) + if check_result: + decrypt_result = msg_base64_decrypt(echostr,encodingAESKey) + print("通过") + return decrypt_result + else: + return "" + + # 处理POST请求(消息处理) + elif request.method == 'POST': + try: + # 获取并解析原始数据部分保持不变 + raw_data = request.get_data() + if isinstance(raw_data, bytes): + raw_data = raw_data.decode('utf-8') + + # 获取参数验证部分保持不变 + msg_signature = request.args.get('msg_signature', '') + timestamp = str(request.args.get('timestamp', '')) + nonce = request.args.get('nonce', '') + + if not all([msg_signature, timestamp, nonce]): + return "缺少必要参数", 400 + + # 解密消息部分保持不变 + ret, xml_content = wxcpt.DecryptMsg( + raw_data, + msg_signature, + timestamp, + nonce + ) + + if ret != 0: + print(f"[ERROR] 解密失败,错误码: {ret}") + return "消息解密失败", 500 + + # 解析XML + xml_tree = ElementTree.fromstring(xml_content) + + def get_text(element): + if element is None: + return '' + text = element.text + if text is None: + return '' + return text.decode('utf-8') if isinstance(text, bytes) else str(text) + + # 获取用户ID和消息内容 + from_user_name = get_text(xml_tree.find('FromUserName')) + msg_content = get_text(xml_tree.find('Content')) + + # 处理刷新对话的关键词 + refresh_keywords = ["new", "refresh", "00", "restart", "刷新", "新话题", "退下", "结束", "over"] + if msg_content.strip().lower() in refresh_keywords: + if hasattr(ai_service, 'conversation_history') and from_user_name in ai_service.conversation_history: + del ai_service.conversation_history[from_user_name] + re_text = "会话已重置" + else: + # 使用用户ID生成回复 + msg_handler = MessageHandler(KEYWORD_REPLIES, ai_service) + re_text = msg_handler.get_reply(msg_content) + if isinstance(ai_service, OpenAIService): + re_text = ai_service.generate_response(msg_content, user_id=from_user_name) + + # 处理回复文本 + re_text = process_text(re_text) + + # 构造回复消息 + reply_dict = { + 'ToUserName': from_user_name, + 'FromUserName': get_text(xml_tree.find('ToUserName')), + 'CreateTime': str(int(time.time())), + 'MsgType': 'text', + 'Content': re_text + } + + # 构造XML回复并加密 + reply_msg = ResponseMessage(reply_dict).xml + + if isinstance(reply_msg, bytes): + reply_msg = reply_msg.decode('utf-8') + + ret, encrypt_xml = wxcpt.EncryptMsg(reply_msg, nonce, timestamp) + + if ret != 0: + print(f"[ERROR] 加密回复消息失败,错误码: {ret}") + return "加密回复消息失败", 500 + + return encrypt_xml + + except Exception as e: + print(f"[ERROR] 处理消息时发生异常: {str(e)}") + return "服务器内部错误", 500 + + except Exception as e: + print(f"处理异常: {str(e)}") + return "服务器错误", 500 + + return "success" + +# ResponseMessage类修改 +class ResponseMessage(object): + def __init__(self, dict_data): + self.dict_data = {k: str(v) for k, v in dict_data.items()} + + @property + def xml(self): + xml = "" + for k, v in self.dict_data.items(): + xml += f"<{k}>" + xml += "" + return xml + + +def process_text(text): + # 1. 把所有的 "\n" 替换为 "__lineFeed__" + text = text.replace('\n', '__lineFeed__') + + # 2. 删除所有换行符(包括 \r 等) + text = re.sub(r'[^\S]+', '', text) + + # 3. 把所有的 "__lineFeed__" 替换回 "\n" + text = text.replace('__lineFeed__', '\n') + # 4. 删除多余的换行符 + text = re.sub(r'\n+', '\n', text) + return text.strip() + + + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/ierror.py b/ierror.py new file mode 100644 index 0000000..6678fec --- /dev/null +++ b/ierror.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +######################################################################### +# Author: jonyqin +# Created Time: Thu 11 Sep 2014 01:53:58 PM CST +# File Name: ierror.py +# Description:定义错误码含义 +######################################################################### +WXBizMsgCrypt_OK = 0 +WXBizMsgCrypt_ValidateSignature_Error = -40001 +WXBizMsgCrypt_ParseXml_Error = -40002 +WXBizMsgCrypt_ComputeSignature_Error = -40003 +WXBizMsgCrypt_IllegalAesKey = -40004 +WXBizMsgCrypt_ValidateCorpid_Error = -40005 +WXBizMsgCrypt_EncryptAES_Error = -40006 +WXBizMsgCrypt_DecryptAES_Error = -40007 +WXBizMsgCrypt_IllegalBuffer = -40008 +WXBizMsgCrypt_EncodeBase64_Error = -40009 +WXBizMsgCrypt_DecodeBase64_Error = -40010 +WXBizMsgCrypt_GenReturnXml_Error = -40011 diff --git a/keyword_config.py b/keyword_config.py new file mode 100644 index 0000000..7fb4d11 --- /dev/null +++ b/keyword_config.py @@ -0,0 +1,23 @@ +KEYWORD_REPLIES = { + 'invoice': { + 'keywords': ['开票', '发票'], + 'reply': '名 称:厦门路桥勘察设计院有限公司\n纳税人识别号:91350200MAD0EHH44J\n电 话:0592-5828192\n地 址:厦门市湖里区槟城道289号701室\n开户行及账号:中国农业银行厦门市分行营业部40379001040059157' + }, + 'price': { + 'keywords': ['价格', '多少钱'], + 'reply': '请联系销售人员获取价格信息' + }, + 'help': { + 'keywords': ['帮助', '使用说明'], + 'reply': '您可以咨询以下内容:\n1. 开票信息\n2. 价格查询\n3. 使用说明' + }, + 'test': { + 'keywords': ['test', '测试'], + #'reply': '高空作业有以下规定:\n\n1. 人员必须正确佩戴安全帽、安全带等劳动防护用品,并经过培训持证上岗,否则不准进入作业现场。\n2. 酒后、过度疲劳或患有高血压、心脏病等疾病的人员不准进行高空作业。\n3. 临边洞口、沟槽、坑、屋面周边等边沿未设置临边防护或未采取安全防护措施的,不准进行高空作业。\n4. 未固定或无防护设施的构件及管道上不准作业或通行。\n5. 各类操作平台、载人装置未确认稳定可靠,周边未设置临边防护的,不准上人作业。\n6. 直梯、人字梯、伸缩梯防滑动措施不牢靠、架体不稳定的,不准上人作业。\n\n这些规定旨在确保高空作业的安全性,防止事故发生。' + 'reply': '''高空作业有以下规定 +:\n\n1. 人员必须正确佩戴安全帽、安全带等劳动防护用品,并经过培训持证上岗,否则不准进入作业现场。\n2. 酒后、过度疲劳或患有高血压、心脏病等疾病的人员不准进行高空作业。\n3. 临边洞口、沟槽、坑、屋面周边等边沿未设置临边防护或未采取安全防护措施的,不准进行高空作业。\n4. 未固定或无 +防护设施的构件及管道上不准作业或通行。\n5. 各类操作平台、载人装置未确认稳定可靠,周边未设置临边防护的,不准上人作业。\n6. 直梯、人字梯、伸缩梯防滑动措施不牢靠、架体不稳定的,不准上人作业。\n\n这些规定旨在确保高空作业的安全性,防止事故发生。''' + +# 删除所有空白字符(包括空格、\t、\r),但保留换行符 \n + }, +} \ No newline at end of file