鼠鼠本来想做一个微信自动回复的脚本 最后发现自己还是实力太浅 之后把代码给AI 修改过 结果越修越乱 有没有大佬 可以帮忙完善一下 用的 API 是星火大语言模型 感谢各位大佬了
import re
import time
import asyncio
import json
import websockets
from uiautomation import WindowControl
from sparkai.llm.llm import ChatSparkLLM, ChunkPrintHandler
from sparkai.core.messages import ChatMessage
# 星火认知大模型的 URL 和凭证信息
SPARKAI_URL = 'wss://spark-api.xf-yun.com/v4.0/chat'
SPARKAI_APP_ID = ''
SPARKAI_API_SECRET = ''
SPARKAI_API_KEY = ''
SPARKAI_DOMAIN = '4.0Ultra'
# 初始化星火模型
spark = ChatSparkLLM(
spark_api_url=SPARKAI_URL,
spark_app_id=SPARKAI_APP_ID,
spark_api_key=SPARKAI_API_KEY,
spark_api_secret=SPARKAI_API_SECRET,
spark_llm_domain=SPARKAI_DOMAIN,
streaming=False,
)
# 存储每个用户的对话历史
conversation_history = {}
# 特定群聊的名称
target_group_name = "群聊" # 替换为目标群聊的名称
# 绑定微信主窗口
wx = WindowControl(Name='微信', ClassName="WeChatMainWndForPC")
wx.SwitchToThisWindow()
# 寻找会话控件绑定
hw = wx.ListControl(Name='会话')
we = hw.TextControl(searchDepth=4)
def request_url(nickname, message):
if nickname not in conversation_history:
conversation_history[nickname] = [
{
'role': 'system',
'content': '你现在正在通过微信与用户交流'
},
{
'role': 'system',
'content': '你叫"你爹",一个智能助手'
}
]
# 追加用户消息到对话历史
conversation_history[nickname].append({
'role': 'user',
'content': message
})
# 准备发送的消息
messages = [ChatMessage(role="user", content=message)]
handler = ChunkPrintHandler()
try:
# 调用星火模型
response = spark.generate([messages], callbacks=[handler])
print("API 响应:", response) # 调试信息
# 将AI回复追加到对话历史中
if response and hasattr(response, 'text'):
conversation_history[nickname].append({
'role': 'assistant',
'content': response.text # 提取文本内容
})
return response.text # 返回文本内容
else:
print("未收到有效的响应")
return None
except Exception as e:
print(f"调用API时出现问题: {str(e)}")
return None
def check_wechat_messages(hw):
while not we.Exists(0):
time.sleep(0.5)
bbb = hw.GetChildren()
for chatMsg in bbb:
print("检测到会话:", chatMsg.Name) # 输出所有会话的名称
# 处理私聊消息
if "条新消息" in chatMsg.Name:
match = re.match(r'(.+?)(\d+)条新消息', chatMsg.Name)
if match:
nickname = match.group(1).strip()
last_msg = chatMsg.Name # 获取最后的消息内容
print(f"来自 {nickname} 的私聊消息:", last_msg)
# 处理私聊消息
response = request_url(nickname, last_msg)
print("回复内容:", response) # 调试信息
if response:
wx.SwitchToThisWindow()
wx.SendKeys(response.replace('{br}', '{Shift}{Enter}'), waitTime=0)
wx.SendKeys('{Enter}', waitTime=0)
# 检查是否是目标群聊
if target_group_name in chatMsg.Name:
process_group_messages(chatMsg)
def process_group_messages(group_chat):
messages = group_chat.GetChildren()
for msg in messages:
print("消息内容:", msg.Name) # 输出消息内容
if "@" in msg.Name: # 检测到@我
match = re.search(r'@(.*?):', msg.Name) # 提取昵称
if match:
nickname = match.group(1).strip()
last_msg = msg.Name # 获取最后的消息内容
print(f"来自 {nickname} 的消息:", last_msg)
# 处理@后的消息内容
response = request_url(nickname, last_msg)
print("回复内容:", response) # 调试信息
if response:
wx.SwitchToThisWindow()
wx.SendKeys(response.replace('{br}', '{Shift}{Enter}'), waitTime=0)
wx.SendKeys('{Enter}', waitTime=0)
if __name__ == "__main__":
i = 1
try:
while True:
print(f"{i}.检测中,等待消息...")
check_wechat_messages(hw=hw)
time.sleep(1)
i += 1
except KeyboardInterrupt:
print("程序退出~")
except Exception as e:
print(f"程序执行出现了问题: {str(e)}")
import re
import time
import asyncio
import json
import websockets
from uiautomation import WindowControl
from sparkai.llm.llm import ChatSparkLLM, ChunkPrintHandler
from sparkai.core.messages import ChatMessage
# 星火认知大模型的 URL 和凭证信息
SPARKAI_URL = 'wss://spark-api.xf-yun.com/v4.0/chat'
SPARKAI_APP_ID = ''
SPARKAI_API_SECRET = ''
SPARKAI_API_KEY = ''
SPARKAI_DOMAIN = '4.0Ultra'
# 初始化星火模型
spark = ChatSparkLLM(
spark_api_url=SPARKAI_URL,
spark_app_id=SPARKAI_APP_ID,
spark_api_key=SPARKAI_API_KEY,
spark_api_secret=SPARKAI_API_SECRET,
spark_llm_domain=SPARKAI_DOMAIN,
streaming=False,
)
# 存储每个用户的对话历史
conversation_history = {}
# 特定群聊的名称
target_group_name = "群聊" # 替换为目标群聊的名称
# 绑定微信主窗口
wx = WindowControl(Name='微信', ClassName="WeChatMainWndForPC")
wx.SwitchToThisWindow()
# 寻找会话控件绑定
hw = wx.ListControl(Name='会话')
we = hw.TextControl(searchDepth=4)
def request_url(nickname, message):
if nickname not in conversation_history:
conversation_history[nickname] = [
{
'role': 'system',
'content': '你现在正在通过微信与用户交流'
},
{
'role': 'system',
'content': '你叫"你爹",一个智能助手'
}
]
# 追加用户消息到对话历史
conversation_history[nickname].append({
'role': 'user',
'content': message
})
# 准备发送的消息
messages = [ChatMessage(role="user", content=message)]
handler = ChunkPrintHandler()
try:
# 调用星火模型
response = spark.generate([messages], callbacks=[handler])
print("API 响应:", response) # 调试信息
# 将AI回复追加到对话历史中
if response and hasattr(response, 'text'):
conversation_history[nickname].append({
'role': 'assistant',
'content': response.text # 提取文本内容
})
return response.text # 返回文本内容
else:
print("未收到有效的响应")
return None
except Exception as e:
print(f"调用API时出现问题: {str(e)}")
return None
def check_wechat_messages(hw):
while not we.Exists(0):
time.sleep(0.5)
bbb = hw.GetChildren()
for chatMsg in bbb:
print("检测到会话:", chatMsg.Name) # 输出所有会话的名称
# 处理私聊消息
if "条新消息" in chatMsg.Name:
match = re.match(r'(.+?)(\d+)条新消息', chatMsg.Name)
if match:
nickname = match.group(1).strip()
last_msg = chatMsg.Name # 获取最后的消息内容
print(f"来自 {nickname} 的私聊消息:", last_msg)
# 处理私聊消息
response = request_url(nickname, last_msg)
print("回复内容:", response) # 调试信息
if response:
wx.SwitchToThisWindow()
wx.SendKeys(response.replace('{br}', '{Shift}{Enter}'), waitTime=0)
wx.SendKeys('{Enter}', waitTime=0)
# 检查是否是目标群聊
if target_group_name in chatMsg.Name:
process_group_messages(chatMsg)
def process_group_messages(group_chat):
messages = group_chat.GetChildren()
for msg in messages:
print("消息内容:", msg.Name) # 输出消息内容
if "@" in msg.Name: # 检测到@我
match = re.search(r'@(.*?):', msg.Name) # 提取昵称
if match:
nickname = match.group(1).strip()
last_msg = msg.Name # 获取最后的消息内容
print(f"来自 {nickname} 的消息:", last_msg)
# 处理@后的消息内容
response = request_url(nickname, last_msg)
print("回复内容:", response) # 调试信息
if response:
wx.SwitchToThisWindow()
wx.SendKeys(response.replace('{br}', '{Shift}{Enter}'), waitTime=0)
wx.SendKeys('{Enter}', waitTime=0)
if __name__ == "__main__":
i = 1
try:
while True:
print(f"{i}.检测中,等待消息...")
check_wechat_messages(hw=hw)
time.sleep(1)
i += 1
except KeyboardInterrupt:
print("程序退出~")
except Exception as e:
print(f"程序执行出现了问题: {str(e)}")

