commit 903e3495e6d0dc736980c4d1734b9790d497ac0a
Author: glowz <24327181@qq.com>
Date: Fri Feb 21 08:04:44 2025 +0800
first commit
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/WXBizMsgCrypt.py b/WXBizMsgCrypt.py
new file mode 100644
index 0000000..1943b72
--- /dev/null
+++ b/WXBizMsgCrypt.py
@@ -0,0 +1,283 @@
+#!/usr/bin/env python
+#-*- encoding:utf-8 -*-
+
+""" 对企业微信发送给企业后台的消息加解密示例代码.
+@copyright: Copyright (c) 1998-2014 Tencent Inc.
+
+"""
+# ------------------------------------------------------------------------
+
+import base64
+import string
+import random
+import hashlib
+import time
+import struct
+from Crypto.Cipher import AES
+import xml.etree.cElementTree as ET
+import sys
+import socket
+import ierror
+
+"""
+
+
+
+关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
+请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
+下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
+"""
+class FormatException(Exception):
+ pass
+
+def throw_exception(message, exception_class=FormatException):
+ """my define raise exception function"""
+ raise exception_class(message)
+
+class SHA1:
+ def getSHA1(self, token, timestamp, nonce, encrypt):
+ try:
+ # 确保所有输入都是字符串类型
+ token = str(token)
+ timestamp = str(timestamp)
+ nonce = str(nonce)
+ encrypt = str(encrypt)
+
+ sortlist = [token, timestamp, nonce, encrypt]
+ sortlist.sort()
+
+ # 将列表转换为字符串并编码
+ str_to_hash = "".join(sortlist).encode('utf-8')
+
+ sha = hashlib.sha1()
+ sha.update(str_to_hash)
+ return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
+ except Exception as e:
+ print(f"[ERROR] SHA1计算失败: {str(e)}")
+ return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
+
+
+class XMLParse:
+ """提供提取消息格式中的密文及生成回复消息格式的接口"""
+
+ # xml消息模板
+ AES_TEXT_RESPONSE_TEMPLATE = """
+
+
+%s
+
+"""
+
+ def extract(self, xmltext):
+ """提取出xml数据包中的加密消息
+ @param xmltext: 待提取的xml字符串
+ @return: 提取出的加密消息字符串
+ """
+ try:
+ xml_tree = ET.fromstring(xmltext)
+ encrypt = xml_tree.find("Encrypt")
+ return ierror.WXBizMsgCrypt_OK, encrypt.text
+ except Exception as e:
+ print (e)
+ return ierror.WXBizMsgCrypt_ParseXml_Error,None
+
+ def generate(self, encrypt, signature, timestamp, nonce):
+ """生成xml消息
+ @param encrypt: 加密后的消息密文
+ @param signature: 安全签名
+ @param timestamp: 时间戳
+ @param nonce: 随机字符串
+ @return: 生成的xml字符串
+ """
+ return self.AES_TEXT_RESPONSE_TEMPLATE % (
+ encrypt,
+ signature,
+ timestamp,
+ nonce
+ )
+
+
+class PKCS7Encoder():
+ """提供基于PKCS7算法的加解密接口"""
+
+ block_size = 32
+ def encode(self, text):
+ if isinstance(text, str):
+ text = text.encode('utf-8')
+
+ # 计算需要填充的位数
+ amount_to_pad = self.block_size - (len(text) % self.block_size)
+ if amount_to_pad == 0:
+ amount_to_pad = self.block_size
+
+ # 填充
+ pad_chr = chr(amount_to_pad).encode('utf-8')
+ padding = pad_chr * amount_to_pad
+
+ return text + padding
+
+ def decode(self, decrypted):
+ """删除解密后明文的补位字符
+ @param decrypted: 解密后的明文(bytes类型)
+ @return: 删除补位字符后的明文
+ """
+ if isinstance(decrypted, bytes):
+ pad = decrypted[-1]
+ else:
+ pad = ord(decrypted[-1])
+
+ if pad < 1 or pad > 32:
+ pad = 0
+
+ return decrypted[:-pad]
+
+
+class Prpcrypt(object):
+ """提供接收和推送给企业微信消息的加解密接口"""
+
+ def __init__(self,key):
+
+ #self.key = base64.b64decode(key+"=")
+ self.key = key
+ # 设置加解密模式为AES的CBC模式
+ self.mode = AES.MODE_CBC
+
+
+ def encrypt(self, text, receiveid):
+ try:
+ # 统一处理输入为bytes
+ text = text.encode('utf-8') if isinstance(text, str) else text
+ receiveid = receiveid.encode('utf-8') if isinstance(receiveid, str) else receiveid
+
+ # 生成16位随机字符串
+ random_str = self.get_random_str().encode('utf-8')
+
+ # 打包文本长度
+ text_len = struct.pack("I", socket.htonl(len(text)))
+
+ # 拼接内容
+ content = b''.join([random_str, text_len, text, receiveid])
+
+ # PKCS7填充
+ pkcs7 = PKCS7Encoder()
+ padding_text = pkcs7.encode(content)
+
+ # AES加密
+ cryptor = AES.new(self.key, self.mode, self.key[:16])
+ ciphertext = cryptor.encrypt(padding_text)
+
+ # Base64编码
+ return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
+
+ except Exception as e:
+ print(f"[ERROR] 加密失败: {str(e)}")
+ return ierror.WXBizMsgCrypt_EncryptAES_Error, None
+
+ def decrypt(self,text,receiveid):
+ try:
+ cryptor = AES.new(self.key,self.mode,self.key[:16])
+ plain_text = cryptor.decrypt(base64.b64decode(text))
+
+ # 获取补位值
+ pad = plain_text[-1]
+
+ # 去除补位字符
+ content = plain_text[16:-pad]
+
+ xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
+ xml_content = content[4 : xml_len+4]
+ from_receiveid = content[xml_len+4:]
+ except Exception as e:
+ print(f"[ERROR] 解密失败: {str(e)}")
+ return ierror.WXBizMsgCrypt_IllegalBuffer,None
+
+ if from_receiveid != receiveid.encode('utf-8'):
+ return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
+
+ return 0,xml_content.decode('utf-8')
+
+ def get_random_str(self):
+ """ 随机生成16位字符串
+ @return: 16位字符串
+ """
+ rule = string.ascii_letters + string.digits # 修改这里
+ str = random.sample(rule, 16)
+ return "".join(str)
+
+class WXBizMsgCrypt(object):
+ #构造函数
+ def __init__(self,sToken,sEncodingAESKey,sReceiveId):
+ try:
+ self.key = base64.b64decode(sEncodingAESKey+"=")
+ assert len(self.key) == 32
+ except:
+ throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
+ # return ierror.WXBizMsgCrypt_IllegalAesKey,None
+ self.m_sToken = sToken
+ self.m_sReceiveId = sReceiveId
+
+ #验证URL
+ #@param sMsgSignature: 签名串,对应URL参数的msg_signature
+ #@param sTimeStamp: 时间戳,对应URL参数的timestamp
+ #@param sNonce: 随机串,对应URL参数的nonce
+ #@param sEchoStr: 随机串,对应URL参数的echostr
+ #@param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
+ #@return:成功0,失败返回对应的错误码
+
+ def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
+ sha1 = SHA1()
+ ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
+ if ret != 0:
+ return ret, None
+ if not signature == sMsgSignature:
+ return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
+ pc = Prpcrypt(self.key)
+ ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.m_sReceiveId)
+ return ret,sReplyEchoStr
+
+ def EncryptMsg(self, sReplyMsg, sNonce, timestamp = None):
+ #将企业回复用户的消息加密打包
+ #@param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
+ #@param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
+ #@param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
+ #sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
+ #return:成功0,sEncryptMsg,失败返回对应的错误码None
+ pc = Prpcrypt(self.key)
+ ret,encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
+ if ret != 0:
+ return ret,None
+ if timestamp is None:
+ timestamp = str(int(time.time()))
+ # 生成安全签名
+ sha1 = SHA1()
+ ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt.decode('utf-8'))
+ if ret != 0:
+ return ret, None
+
+ xmlParse = XMLParse()
+ return ret, xmlParse.generate(encrypt.decode('utf-8'), signature, timestamp, sNonce)
+
+ def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
+ # 检验消息的真实性,并且获取解密后的明文
+ # @param sMsgSignature: 签名串,对应URL参数的msg_signature
+ # @param sTimeStamp: 时间戳,对应URL参数的timestamp
+ # @param sNonce: 随机串,对应URL参数的nonce
+ # @param sPostData: 密文,对应POST请求的数据
+ # xml_content: 解密后的原文,当return返回0时有效
+ # @return: 成功0,失败返回对应的错误码
+ # 验证安全签名
+ xmlParse = XMLParse()
+ ret,encrypt = xmlParse.extract(sPostData)
+ if ret != 0:
+ return ret, None
+ sha1 = SHA1()
+ ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
+ if ret != 0:
+ return ret, None
+ if not signature == sMsgSignature:
+ return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
+ pc = Prpcrypt(self.key)
+ ret,xml_content = pc.decrypt(encrypt,self.m_sReceiveId)
+ return ret,xml_content
+
+
diff --git a/ai_service.py b/ai_service.py
new file mode 100644
index 0000000..40df5e7
--- /dev/null
+++ b/ai_service.py
@@ -0,0 +1,452 @@
+# ai_service.py 优化版本
+import requests
+import logging
+from typing import Dict, Optional
+from functools import lru_cache
+from config import OLLAMA_MODEL, OPENAI_API_KEY, OPENAI_MODEL,OPENAI_BASE_URL # 确保config.py中有这些配置
+import time
+# 配置日志
+logging.basicConfig(
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ level=logging.INFO
+)
+logger = logging.getLogger(__name__)
+
+class AIService:
+ """AI服务抽象基类"""
+ def generate_response(self, prompt: str) -> str:
+ """
+ 生成AI回复
+ :param prompt: 用户输入的提示文本
+ :return: 生成的回复文本
+ """
+ raise NotImplementedError
+
+
+class OllamaService(AIService):
+ """Ollama本地模型服务实现"""
+ def __init__(
+ self,
+ endpoint: str = "http://localhost:11434/api/generate",
+ model: str = OLLAMA_MODEL,
+ timeout: int = 10
+ ):
+ self.endpoint = endpoint
+ self.default_model = model
+ self.timeout = timeout
+
+ @lru_cache(maxsize=100)
+ def generate_response(self, prompt: str) -> str:
+ try:
+ response = requests.post(
+ self.endpoint,
+ json={
+ 'model': self.default_model,
+ 'prompt': prompt,
+ 'stream': False
+ },
+ timeout=self.timeout
+ )
+ response.raise_for_status()
+
+ result = response.json()
+ return result.get('response', '收到您的消息')
+
+ except requests.exceptions.ConnectionError:
+ logger.error("无法连接Ollama服务,请检查服务状态")
+ return "本地模型服务未启动"
+
+ except requests.exceptions.Timeout:
+ logger.warning("Ollama请求超时")
+ return "响应超时,请简化问题"
+
+ except Exception as e:
+ logger.error(f"Ollama处理异常: {str(e)}", exc_info=True)
+ return "本地模型服务异常"
+
+
+
+class DifyService(AIService):
+ """Dify API客户端封装"""
+
+ def __init__(
+ self,
+ api_key: str,
+ base_url: str = "http://localhost/v1",
+ timeout: int = 100,
+ default_user: str = "system"
+ ):
+ """
+ :param api_key: 应用API密钥
+ :param base_url: API基础地址 (默认: http://localhost/v1)
+ :param timeout: 请求超时时间 (秒)
+ :param default_user: 默认用户标识
+ """
+ self._validate_config(api_key, base_url)
+
+ self.api_key = api_key
+ self.base_url = base_url.rstrip('/')
+ self.timeout = timeout
+ self.default_user = default_user
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ self.session = requests.Session()
+ self.headers = {
+ "Authorization": f"Bearer {self.api_key}",
+ "Content-Type": "application/json"
+ }
+
+ def _validate_config(self, api_key: str, base_url: str):
+ """配置校验"""
+ if not api_key.startswith('app-'):
+ raise ValueError("Invalid API key format")
+ if not base_url.startswith(('http://', 'https://')):
+ raise ValueError("Invalid base URL protocol")
+
+ @lru_cache(maxsize=100)
+ def generate_response(
+ self,
+ query: str,
+ response_mode: str = "blocking",
+ conversation_id: Optional[str] = None,
+ user: Optional[str] = None,
+ **additional_inputs
+ ) -> str:
+ """
+ 生成对话响应
+ :param query: 用户查询内容
+ :param response_mode: 响应模式 (blocking/streaming)
+ :param conversation_id: 会话ID (为空时创建新会话)
+ :param user: 用户标识 (默认使用初始化参数"""
+
+ try:
+ response = requests.post(
+ f"{self.base_url}/chat-messages",
+ headers=self.headers,
+ json={
+ "inputs": {},
+ "query": query,
+ "response_mode": "blocking",
+ "conversation_id": "",
+ "user": "abc-123"
+ },
+ timeout=self.timeout
+ )
+ response.raise_for_status()
+
+ #response.json()["answer"]
+ return response.json()["answer"]
+
+ except requests.exceptions.ConnectionError:
+ logger.error("无法连接dify服务,请检查服务状态")
+ return "本地模型服务未启动"
+
+ except requests.exceptions.Timeout:
+ logger.warning("dify请求超时")
+ return "响应超时,请简化问题"
+
+ except Exception as e:
+ logger.error(f"dify处理异常: {str(e)}", exc_info=True)
+ return "本地模型服务异常"
+
+
+class OpenAIService(AIService):
+ """OpenAI官方接口服务实现"""
+ def __init__(
+ self,
+ api_key: str = OPENAI_API_KEY,
+ model: str = OPENAI_MODEL,
+ base_url: str = OPENAI_BASE_URL,
+ timeout: int = 15,
+ temperature: float = 0.7,
+ max_conversation_length: int = 10,
+ max_time_gap: int = 30
+ ):
+ self._validate_config(api_key, model)
+
+ self.api_key = api_key
+ self.default_model = model
+ self.base_url = base_url
+ self.timeout = timeout
+ self.temperature = temperature
+ self.headers = {
+ "Authorization": f"Bearer {self.api_key}",
+ "Content-Type": "application/json"
+ }
+ # 新增会话管理相关属性
+ self.conversation_history = {}
+ self.max_conversation_length = max_conversation_length
+ self.max_time_gap = max_time_gap
+ self.system_prompt = '''你是路桥设计院智能助手。你的使命是尽可能地用详尽的、温暖的、友善的话语帮助企业员工,在各种方面提供帮助和支持。无论我需要什么帮助或建议,你都会尽力提供详尽信息。'''
+
+ def _validate_config(self, api_key: str, model: str) -> None:
+ """
+ 验证OpenAI配置参数
+ :param api_key: OpenAI API密钥
+ :param model: 模型名称
+ :raises ValueError: 当配置参数无效时抛出
+ """
+ if not api_key:
+ raise ValueError("OpenAI API密钥不能为空")
+
+ if not model:
+ raise ValueError("模型名称不能为空")
+
+ if not isinstance(api_key, str) or not isinstance(model, str):
+ raise ValueError("API密钥和模型名称必须是字符串类型")
+
+ # 可选:验证API密钥格式
+ # if not api_key.startswith('sk-'):
+ # raise ValueError("无效的OpenAI API密钥格式")
+
+ def _manage_conversation_history(self, user_id: str, message: str):
+ """管理会话历史"""
+ current_timestamp = int(time.time())
+
+ # 检查会话是否超时
+ if (user_id in self.conversation_history and
+ current_timestamp - self.conversation_history[user_id]["last_timestamp"] >= self.max_time_gap * 60):
+ del self.conversation_history[user_id]
+
+ # 初始化或更新会话历史
+ if user_id not in self.conversation_history:
+ self.conversation_history[user_id] = {
+ "messages": [],
+ "last_timestamp": current_timestamp
+ }
+ else:
+ self.conversation_history[user_id]["last_timestamp"] = current_timestamp
+
+ # 限制会话历史长度
+ if len(self.conversation_history[user_id]["messages"]) > self.max_conversation_length:
+ self.conversation_history[user_id]["messages"] = (
+ self.conversation_history[user_id]["messages"][-self.max_conversation_length:]
+ )
+
+ # 添加新消息
+ self.conversation_history[user_id]["messages"].append({
+ "role": "user",
+ "content": message
+ })
+
+ def generate_response(self, prompt: str, user_id: str = "default_user") -> str:
+ """
+ 生成带有会话历史的回复
+ :param prompt: 用户输入的提示文本
+ :param user_id: 用户标识符
+ :return: 生成的回复文本
+ """
+ try:
+ self._manage_conversation_history(user_id, prompt)
+
+ # 构建完整的消息历史
+ messages = [{"role": "system", "content": self.system_prompt}]
+ messages.extend(self.conversation_history[user_id]["messages"])
+
+ response = requests.post(
+ f"{self.base_url}/chat/completions",
+ headers=self.headers,
+ json={
+ "model": self.default_model,
+ "messages": messages,
+ "temperature": self.temperature
+ },
+ timeout=self.timeout
+ )
+ response.raise_for_status()
+
+ result = response.json()
+ if 'choices' not in result:
+ logger.error(f"OpenAI响应格式异常: {result}")
+ return "响应解析失败"
+
+ response_text = result['choices'][0]['message']['content']
+
+ # 保存助手的回复到会话历史
+ self.conversation_history[user_id]["messages"].append({
+ "role": "assistant",
+ "content": response_text
+ })
+
+ return response_text
+
+ except Exception as e:
+ logger.error(f"OpenAI处理异常: {str(e)}", exc_info=True)
+ return "服务暂时不可用"
+
+class FastGptService(AIService):
+ """FastGPT API客户端封装"""
+
+ def __init__(
+ self,
+ api_key: str,
+ base_url: str = "http://localhost:3000/api/v1",
+ timeout: int = 30,
+ max_conversation_length: int = 10,
+ max_time_gap: int = 30
+ ):
+ """
+ 初始化FastGPT服务
+ :param api_key: FastGPT API密钥
+ :param base_url: API基础地址
+ :param timeout: 请求超时时间(秒)
+ :param max_conversation_length: 最大会话长度
+ :param max_time_gap: 会话超时时间(分钟)
+ """
+ self._validate_config(api_key, base_url)
+
+ self.api_key = api_key
+ self.base_url = base_url.rstrip('/')
+ self.timeout = timeout
+ self.headers = {
+ "Authorization": f"Bearer {api_key}",
+ "Content-Type": "application/json"
+ }
+
+ # 会话管理相关属性
+ self.conversation_history = {}
+ self.max_conversation_length = max_conversation_length
+ self.max_time_gap = max_time_gap
+
+ def _validate_config(self, api_key: str, base_url: str):
+ """配置校验"""
+ if not api_key:
+ raise ValueError("FastGPT API密钥不能为空")
+ if not base_url.startswith(('http://', 'https://')):
+ raise ValueError("无效的基础URL协议")
+
+ def _manage_conversation_history(self, user_id: str, message: str):
+ """管理会话历史"""
+ current_timestamp = int(time.time())
+
+ # 检查会话是否超时
+ if (user_id in self.conversation_history and
+ current_timestamp - self.conversation_history[user_id]["last_timestamp"] >= self.max_time_gap * 60):
+ del self.conversation_history[user_id]
+
+ # 初始化或更新会话历史
+ if user_id not in self.conversation_history:
+ self.conversation_history[user_id] = {
+ "chat_id": f"chat_{user_id}_{current_timestamp}",
+ "messages": [],
+ "last_timestamp": current_timestamp
+ }
+ else:
+ self.conversation_history[user_id]["last_timestamp"] = current_timestamp
+
+ # 限制会话历史长度
+ if len(self.conversation_history[user_id]["messages"]) >= self.max_conversation_length:
+ self.conversation_history[user_id]["messages"] = (
+ self.conversation_history[user_id]["messages"][-self.max_conversation_length:]
+ )
+
+ # 添加新消息
+ msg_id = f"msg_{current_timestamp}"
+ self.conversation_history[user_id]["messages"].append({
+ "role": "user",
+ "content": message
+ })
+ return msg_id
+
+ def generate_response(
+ self,
+ prompt: str,
+ user_id: str = "default_user",
+ variables: dict = None,
+ detail: bool = False
+ ) -> str:
+ """
+ 生成回复
+ :param prompt: 用户输入
+ :param user_id: 用户标识
+ :param variables: 模块变量
+ :param detail: 是否返回详细信息
+ :return: 生成的回复文本
+ """
+ try:
+ msg_id = self._manage_conversation_history(user_id, prompt)
+ chat_info = self.conversation_history.get(user_id, {})
+
+ payload = {
+ "chatId": chat_info.get("chat_id"),
+ "stream": False,
+ "detail": detail,
+ "responseChatItemId": msg_id,
+ "variables": variables or {},
+ "messages": [{"role": "user", "content": prompt}]
+ }
+
+ response = requests.post(
+ f"{self.base_url}/chat/completions",
+ headers=self.headers,
+ json=payload,
+ timeout=self.timeout
+ )
+ response.raise_for_status()
+
+ result = response.json()
+
+ # 处理响应
+ if detail:
+ response_text = result.get("responseData", {}).get("content", "响应解析失败")
+ else:
+ response_text = result.get("content", "响应解析失败")
+
+ # 保存助手回复到会话历史
+ if user_id in self.conversation_history:
+ self.conversation_history[user_id]["messages"].append({
+ "role": "assistant",
+ "content": response_text
+ })
+
+ return response_text
+
+ except requests.exceptions.ConnectionError:
+ logger.error("无法连接FastGPT服务")
+ return "服务连接失败"
+
+ except requests.exceptions.Timeout:
+ logger.warning("FastGPT请求超时")
+ return "响应超时"
+
+ except Exception as e:
+ logger.error(f"FastGPT处理异常: {str(e)}", exc_info=True)
+ return "服务暂时不可用"
+
+class HybridAIService(AIService):
+ """混合AI服务(故障转移模式)"""
+ def __init__(self, services: list[AIService]):
+ self.services = services
+
+ def generate_response(self, prompt: str) -> str:
+ for service in self.services:
+ try:
+ return service.generate_response(prompt)
+ except Exception as e:
+ logger.warning(f"{type(service).__name__} 服务失败: {str(e)}")
+ continue
+ return "所有AI服务不可用"
+
+class MessageHandler:
+ """智能消息处理器"""
+ def __init__(self, keyword_config: Dict, ai_service: AIService):
+ """
+ :param keyword_config: 关键词配置字典
+ :param ai_service: AI服务实例
+ """
+ self.keyword_config = keyword_config
+ self.ai_service = ai_service
+
+ def get_reply(self, content: str) -> str:
+ # 优先全匹配关键词
+ for rule in self.keyword_config.values():
+ if any(kw == content.strip() for kw in rule['keywords']):
+ return rule['reply']
+
+ # 其次模糊匹配
+ for rule in self.keyword_config.values():
+ if any(kw in content for kw in rule['keywords']):
+ return rule['reply']
+
+ # 无匹配时调用AI
+ return self.ai_service.generate_response(content)
\ No newline at end of file
diff --git a/callback1.py b/callback1.py
new file mode 100644
index 0000000..1d8b4d6
--- /dev/null
+++ b/callback1.py
@@ -0,0 +1,219 @@
+from config import AgentId,Secret,corpid,token,encodingAESKey
+#import xml.etree.ElementTree as ET
+from flask import Flask,request
+from WXBizMsgCrypt import WXBizMsgCrypt
+import base64,hashlib
+from Crypto.Cipher import AES
+import time
+from xml.etree import ElementTree
+from keyword_config import KEYWORD_REPLIES
+from ai_service import OllamaService,MessageHandler,DifyService
+from typing import Dict
+from config import OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL,DIFY_API_KEY,DIFY_BASE_URL
+from config import FASTAPI_BASE_URL, FASTAPI_API_KEY
+import re
+
+from ai_service import OpenAIService,FastGptService
+app =Flask(__name__)
+wxcpt = WXBizMsgCrypt(token,encodingAESKey,corpid)
+#ai_service = OllamaService()
+ai_service = OpenAIService(OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL)
+#ai_service = DifyService(DIFY_API_KEY,DIFY_BASE_URL)
+#ai_service = FastGptService(FASTAPI_API_KEY, FASTAPI_BASE_URL)
+
+# 检查base64编码后数据位数是否正确
+def check_base64_len(base64_str):
+ len_remainder = 4 - (len(base64_str) % 4)
+ if len_remainder == 0:
+ return base64_str
+ else:
+ for temp in range(0,len_remainder):
+ base64_str = base64_str + "="
+ return base64_str
+# 解密并提取消息正文
+def msg_base64_decrypt(ciphertext_base64,key_base64):
+ # 处理密文、密钥和iv
+ ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext_base64))
+ key_bytes = base64.b64decode(check_base64_len(key_base64))
+ iv_bytes = key_bytes[:16]
+
+ # 解密
+ decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
+ plaintext_bytes = decr.decrypt(ciphertext_bytes)
+
+ # 截取数据,判断消息正文字节数
+ msg_len_bytes = plaintext_bytes[16:20]
+ msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)
+
+ # 根据消息正文字节数截取消息正文,并转为字符串格式
+ msg_bytes = plaintext_bytes[20:20+msg_len]
+ msg = str(msg_bytes,encoding='utf-8')
+
+ return msg
+# 消息体签名校验
+def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
+ # 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
+ li = [token,timestamp,nonce,echostr]
+ li.sort()
+ # 将排序结果拼接
+ li_str = li[0]+li[1]+li[2]+li[3]
+
+ # 计算SHA-1值
+ sha1 = hashlib.sha1()
+ # update()要指定加密字符串字符代码,不然要报错:
+ # "Unicode-objects must be encoded before hashing"
+ sha1.update(li_str.encode("utf8"))
+ sha1_result = sha1.hexdigest()
+
+ # 比较并返回比较结果
+ if sha1_result == msg_signature:
+ return True
+ else:
+ return False
+@app.route('/hello', methods=['GET'])
+def hello():
+ return "Hello, Flask!"
+
+@app.route('/', methods=['GET', 'POST'])
+def reply():
+ try:
+ # 处理GET请求(验证URL)部分保持不变
+ if request.method == 'GET':
+ msg_signature = request.args.to_dict().get("msg_signature")
+ timestamp = request.args.to_dict().get("timestamp")
+ nonce = request.args.to_dict().get("nonce")
+ echostr = request.args.to_dict().get("echostr")
+ print(msg_signature,timestamp,nonce,echostr)
+ # 获取消息体签名校验结果
+ check_result = check_msg_signature(msg_signature,token,timestamp,nonce,echostr)
+ if check_result:
+ decrypt_result = msg_base64_decrypt(echostr,encodingAESKey)
+ print("通过")
+ return decrypt_result
+ else:
+ return ""
+
+ # 处理POST请求(消息处理)
+ elif request.method == 'POST':
+ try:
+ # 获取并解析原始数据部分保持不变
+ raw_data = request.get_data()
+ if isinstance(raw_data, bytes):
+ raw_data = raw_data.decode('utf-8')
+
+ # 获取参数验证部分保持不变
+ msg_signature = request.args.get('msg_signature', '')
+ timestamp = str(request.args.get('timestamp', ''))
+ nonce = request.args.get('nonce', '')
+
+ if not all([msg_signature, timestamp, nonce]):
+ return "缺少必要参数", 400
+
+ # 解密消息部分保持不变
+ ret, xml_content = wxcpt.DecryptMsg(
+ raw_data,
+ msg_signature,
+ timestamp,
+ nonce
+ )
+
+ if ret != 0:
+ print(f"[ERROR] 解密失败,错误码: {ret}")
+ return "消息解密失败", 500
+
+ # 解析XML
+ xml_tree = ElementTree.fromstring(xml_content)
+
+ def get_text(element):
+ if element is None:
+ return ''
+ text = element.text
+ if text is None:
+ return ''
+ return text.decode('utf-8') if isinstance(text, bytes) else str(text)
+
+ # 获取用户ID和消息内容
+ from_user_name = get_text(xml_tree.find('FromUserName'))
+ msg_content = get_text(xml_tree.find('Content'))
+
+ # 处理刷新对话的关键词
+ refresh_keywords = ["new", "refresh", "00", "restart", "刷新", "新话题", "退下", "结束", "over"]
+ if msg_content.strip().lower() in refresh_keywords:
+ if hasattr(ai_service, 'conversation_history') and from_user_name in ai_service.conversation_history:
+ del ai_service.conversation_history[from_user_name]
+ re_text = "会话已重置"
+ else:
+ # 使用用户ID生成回复
+ msg_handler = MessageHandler(KEYWORD_REPLIES, ai_service)
+ re_text = msg_handler.get_reply(msg_content)
+ if isinstance(ai_service, OpenAIService):
+ re_text = ai_service.generate_response(msg_content, user_id=from_user_name)
+
+ # 处理回复文本
+ re_text = process_text(re_text)
+
+ # 构造回复消息
+ reply_dict = {
+ 'ToUserName': from_user_name,
+ 'FromUserName': get_text(xml_tree.find('ToUserName')),
+ 'CreateTime': str(int(time.time())),
+ 'MsgType': 'text',
+ 'Content': re_text
+ }
+
+ # 构造XML回复并加密
+ reply_msg = ResponseMessage(reply_dict).xml
+
+ if isinstance(reply_msg, bytes):
+ reply_msg = reply_msg.decode('utf-8')
+
+ ret, encrypt_xml = wxcpt.EncryptMsg(reply_msg, nonce, timestamp)
+
+ if ret != 0:
+ print(f"[ERROR] 加密回复消息失败,错误码: {ret}")
+ return "加密回复消息失败", 500
+
+ return encrypt_xml
+
+ except Exception as e:
+ print(f"[ERROR] 处理消息时发生异常: {str(e)}")
+ return "服务器内部错误", 500
+
+ except Exception as e:
+ print(f"处理异常: {str(e)}")
+ return "服务器错误", 500
+
+ return "success"
+
+# ResponseMessage类修改
+class ResponseMessage(object):
+ def __init__(self, dict_data):
+ self.dict_data = {k: str(v) for k, v in dict_data.items()}
+
+ @property
+ def xml(self):
+ xml = ""
+ for k, v in self.dict_data.items():
+ xml += f"<{k}>{k}>"
+ xml += ""
+ return xml
+
+
+def process_text(text):
+ # 1. 把所有的 "\n" 替换为 "__lineFeed__"
+ text = text.replace('\n', '__lineFeed__')
+
+ # 2. 删除所有换行符(包括 \r 等)
+ text = re.sub(r'[^\S]+', '', text)
+
+ # 3. 把所有的 "__lineFeed__" 替换回 "\n"
+ text = text.replace('__lineFeed__', '\n')
+ # 4. 删除多余的换行符
+ text = re.sub(r'\n+', '\n', text)
+ return text.strip()
+
+
+
+
+if __name__ == '__main__':
+ app.run(host='0.0.0.0', port=5000, debug=True)
diff --git a/ierror.py b/ierror.py
new file mode 100644
index 0000000..6678fec
--- /dev/null
+++ b/ierror.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#########################################################################
+# Author: jonyqin
+# Created Time: Thu 11 Sep 2014 01:53:58 PM CST
+# File Name: ierror.py
+# Description:定义错误码含义
+#########################################################################
+WXBizMsgCrypt_OK = 0
+WXBizMsgCrypt_ValidateSignature_Error = -40001
+WXBizMsgCrypt_ParseXml_Error = -40002
+WXBizMsgCrypt_ComputeSignature_Error = -40003
+WXBizMsgCrypt_IllegalAesKey = -40004
+WXBizMsgCrypt_ValidateCorpid_Error = -40005
+WXBizMsgCrypt_EncryptAES_Error = -40006
+WXBizMsgCrypt_DecryptAES_Error = -40007
+WXBizMsgCrypt_IllegalBuffer = -40008
+WXBizMsgCrypt_EncodeBase64_Error = -40009
+WXBizMsgCrypt_DecodeBase64_Error = -40010
+WXBizMsgCrypt_GenReturnXml_Error = -40011
diff --git a/keyword_config.py b/keyword_config.py
new file mode 100644
index 0000000..7fb4d11
--- /dev/null
+++ b/keyword_config.py
@@ -0,0 +1,23 @@
+KEYWORD_REPLIES = {
+ 'invoice': {
+ 'keywords': ['开票', '发票'],
+ 'reply': '名 称:厦门路桥勘察设计院有限公司\n纳税人识别号:91350200MAD0EHH44J\n电 话:0592-5828192\n地 址:厦门市湖里区槟城道289号701室\n开户行及账号:中国农业银行厦门市分行营业部40379001040059157'
+ },
+ 'price': {
+ 'keywords': ['价格', '多少钱'],
+ 'reply': '请联系销售人员获取价格信息'
+ },
+ 'help': {
+ 'keywords': ['帮助', '使用说明'],
+ 'reply': '您可以咨询以下内容:\n1. 开票信息\n2. 价格查询\n3. 使用说明'
+ },
+ 'test': {
+ 'keywords': ['test', '测试'],
+ #'reply': '高空作业有以下规定:\n\n1. 人员必须正确佩戴安全帽、安全带等劳动防护用品,并经过培训持证上岗,否则不准进入作业现场。\n2. 酒后、过度疲劳或患有高血压、心脏病等疾病的人员不准进行高空作业。\n3. 临边洞口、沟槽、坑、屋面周边等边沿未设置临边防护或未采取安全防护措施的,不准进行高空作业。\n4. 未固定或无防护设施的构件及管道上不准作业或通行。\n5. 各类操作平台、载人装置未确认稳定可靠,周边未设置临边防护的,不准上人作业。\n6. 直梯、人字梯、伸缩梯防滑动措施不牢靠、架体不稳定的,不准上人作业。\n\n这些规定旨在确保高空作业的安全性,防止事故发生。'
+ 'reply': '''高空作业有以下规定
+:\n\n1. 人员必须正确佩戴安全帽、安全带等劳动防护用品,并经过培训持证上岗,否则不准进入作业现场。\n2. 酒后、过度疲劳或患有高血压、心脏病等疾病的人员不准进行高空作业。\n3. 临边洞口、沟槽、坑、屋面周边等边沿未设置临边防护或未采取安全防护措施的,不准进行高空作业。\n4. 未固定或无
+防护设施的构件及管道上不准作业或通行。\n5. 各类操作平台、载人装置未确认稳定可靠,周边未设置临边防护的,不准上人作业。\n6. 直梯、人字梯、伸缩梯防滑动措施不牢靠、架体不稳定的,不准上人作业。\n\n这些规定旨在确保高空作业的安全性,防止事故发生。'''
+
+# 删除所有空白字符(包括空格、\t、\r),但保留换行符 \n
+ },
+}
\ No newline at end of file