微信消息智能处理系统:从0到1构建大模型自动化工作流
本文介绍了一个微信消息智能处理系统,通过自动化抓取微信群消息,结合大模型AI技术实现结构化信息提取。系统采用三级缓存机制确保数据安全,通过图片就近匹配算法关联文本与图片消息,并自动上传至阿里云OSS。后端采用双阶段AI处理流程:先审核图片是否为车辆/行驶证,再提取结构化客户线索。系统具备智能过滤、断点续传、异常恢复等功能,支持7×24小时稳定运行,有效提升了企业客户线索处理效率。主要技术栈包括Py
前言
在现代企业运营中,微信群已成为重要的客户沟通渠道。面对海量的群聊消息,如何高效提取有价值信息、智能处理多媒体内容、并自动转化为结构化数据,成为提升运营效率的关键。本文将详细介绍一个完整的微信消息智能处理系统,从消息抓取到AI智能处理的全链路实现。
一、系统架构概览
本系统采用前后端分离架构,整体流程如下:
微信消息抓取 → 本地缓存处理 → 图片上传OSS → 大模型智能识别 → 结构化数据推送
核心组件:
-
前端:Python自动化脚本(基于wxautox)
-
存储层:本地TXT文件 + 阿里云OSS
-
AI处理层:Coze大模型API(图片识别+信息规整)
-
后端服务:Java Spring Boot
二、微信消息自动化抓取
2.1 消息抓取框架
基于wxautox库,我们实现了微信客户端的自动化操作。核心代码采用多线程和异常处理机制,确保稳定运行:
def init_wechat(max_retries=10, retry_interval=5):
"""初始化微信客户端,带有重试机制"""
for retry in range(max_retries):
try:
print(f"尝试初始化微信客户端 (第 {retry + 1} 次)...")
wx = WeChat()
print("微信客户端初始化成功")
return wx
except Exception as e:
print(f"初始化微信客户端失败 (第 {retry + 1} 次): {str(e)}")
if retry < max_retries - 1:
time.sleep(retry_interval)
else:
raise
2.2 消息过滤策略
为了聚焦业务价值,我们对消息进行智能过滤:
# 配置参数 save_pic = True # 保存图片 save_video = False # 不保存视频 save_file = False # 不保存文件 save_voice = False # 不保存语音 parse_url = False # 不解析URL
设计思路:仅关注包含车辆图片和联系方式的潜在客户线索,过滤无关信息降低处理负担。
三、数据缓存与持久化机制
3.1 内存+文件双缓存策略
系统采用三级缓存机制确保数据不丢失:
# 内存缓存 user_data_list = [] # 临时存储消息 list_lock = threading.Lock() # 线程安全锁 # 文件备份 BACKUP_FILE = 'z_back.txt' # 实时备份 RECORD_FILE = 'z_record.txt' # 操作日志 ERROR_LOG_FILE = 'z_error.log' # 错误日志 # 批量处理阈值 BATCH_SIZE = 100 # 达到100条时触发处理
3.2 智能恢复机制
系统启动时自动检查备份文件,实现断点续传:
def load_backup_data():
"""从备份文件加载数据"""
if os.path.exists(BACKUP_FILE):
try:
with open(BACKUP_FILE, 'r', encoding='utf-8') as f:
data = f.read().strip()
if data:
return json.loads(data)
except Exception as e:
print(f"加载备份文件失败: {str(e)}")
return []
四、图片智能处理流程
4.1 图片消息整合算法
由于微信群中图片和文本可能分开发送,我们设计了时间就近匹配算法:
def integrate_messages(data):
"""整合图片消息到最近的文本消息"""
image_msgs = []
text_msgs = []
# 分离图片和文本消息
for msg in data:
if '.png' in msg['pythonInfo']:
image_msgs.append(msg)
else:
text_msgs.append(msg)
# 时间匹配:为每个图片找到最近的文本消息
for img_msg in image_msgs:
min_diff = float('inf')
closest_text_msg = None
for text_msg in text_msgs:
# 同一发送者的消息才进行匹配
if text_msg['who'] == img_msg['who'] and text_msg['sender'] == img_msg['sender']:
time_diff = abs(text_msg['time'] - img_msg['time'])
if time_diff < min_diff:
min_diff = time_diff
closest_text_msg = text_msg
# 整合图片URL到文本消息
if closest_text_msg:
if 'imgRul' not in closest_text_msg:
closest_text_msg['imgRul'] = img_msg['pythonInfo']
else:
closest_text_msg['imgRul'] += ',' + img_msg['pythonInfo']
4.2 云存储自动化上传
整合后的图片上传至阿里云OSS,生成永久访问链接:
def upload_to_oss(image_path):
"""上传图片到阿里云OSS"""
auth = oss2.Auth('YOUR_ACCESS_KEY', 'YOUR_SECRET_KEY')
bucket = oss2.Bucket(auth, 'your-endpoint', 'chelaike')
# 生成唯一文件名:时间戳+UUID
unique_filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{str(uuid.uuid4())[:8]}.png"
oss_path = f"wx_clue_pic/{unique_filename}"
# 上传并返回URL
bucket.put_object_from_file(oss_path, image_path)
return f"https://chelaike.oss-cn-beijing.aliyuncs.com/{oss_path}"
五、大模型智能审核系统
5.1 双阶段AI处理流程
系统采用两个专用大模型进行串行处理:
阶段1:图片审核模型 → 判断是否为车辆/行驶证图片 阶段2:信息规整模型 → 提取结构化客户线索
5.2 Java后端智能处理
@PostMapping("/insert_user")
public Res insert_user(@RequestBody User user) throws Exception {
// 1. 基础信息拼接
String originPythonInfo = "群名称:" + user.getWho() + "。发送者:" + user.getSender();
// 2. 图片合法性审核(使用车辆识别模型)
if (user.getImgRul() != null) {
String[] imgUrlArray = user.getImgRul().split(",");
StringBuilder tempTotalUrl = new StringBuilder();
for (String tempImgUrl : imgUrlArray) {
// 调用图片审核模型
String imageModelRes = SmartEntity_3.SmartWork(
UUID.randomUUID().toString(),
tempImgUrl
);
// 解析模型返回结果
ImageModelResJson resJson = gson.fromJson(imageModelRes, ImageModelResJson.class);
// 仅保留通过审核的图片
if ("是".equals(resJson.aCarOrADrivingLicense)) {
tempTotalUrl.append(tempImgUrl).append(",");
}
}
// 3. 信息规整处理(使用豆包智能体)
if (tempTotalUrl.length() > 0) {
originPythonInfo += "。图片链接:" + tempTotalUrl;
String uploadInfo = SmartEntity_1.SmartWork(
UUID.randomUUID().toString(),
originPythonInfo
);
// 4. 推送至业务系统
String res = HttpUpload.sendPostRequest(url, uploadInfo);
}
}
return Res.success();
}
5.3 大模型API调用优化
// 配置统一的HTTP客户端
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.callTimeout(60, TimeUnit.SECONDS)
.build();
// 流式响应解析处理
public static String parseResponseBody(String responseBody) {
String[] lines = responseBody.split("\n");
StringBuilder resultBuilder = new StringBuilder();
for (String line : lines) {
line = line.trim();
if (line.startsWith("data:")) {
String dataLine = line.substring(5).trim();
try {
JSONObject jsonData = new JSONObject(dataLine);
if ("answer".equals(jsonData.optString("type"))) {
String content = jsonData.getString("content");
resultBuilder.append(unescapeJsonString(content));
}
} catch (Exception e) {
// 降级处理:正则表达式匹配
Pattern pattern = Pattern.compile("\"content\":\"(.*?)\"", Pattern.DOTALL);
Matcher matcher = pattern.matcher(dataLine);
if (matcher.find()) {
resultBuilder.append(unescapeJsonString(matcher.group(1)));
}
}
}
}
return resultBuilder.toString();
}
六、业务规则与提交条件
系统采用智能判断逻辑,确保只推送有价值线索:
def contains_phone_number(text):
"""判断是否包含中国大陆手机号"""
pattern = r'(?<!\d)(1[3-9]\d{9})(?!\d)'
return bool(re.search(pattern, text))
# 提交条件判断逻辑
has_phone = contains_phone_number(info) or contains_phone_number(sender)
has_valid_img = img_url and "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic" in img_url
if has_phone and has_valid_img:
# 完整线索:有电话+有效图片,立即提交
executor.submit(send_http_request, user_data)
elif has_phone and time_diff > 300:
# 超时线索:有电话但等待超过5分钟,提交文本
executor.submit(send_http_request, user_data)
七、系统优化与监控
7.1 异常处理机制
def write_error_log(error_msg):
"""统一错误日志记录"""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {error_msg}\n{traceback.format_exc()}\n{'='*80}\n"
with file_lock:
with open(ERROR_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(log_entry)
7.2 性能优化策略
-
线程池管理:固定大小线程池避免资源耗尽
-
批量处理:100条消息批处理减少网络开销
-
连接复用:HTTP连接池提升API调用效率
-
内存控制:定期清理已处理数据
7.3 监控指标
# 关键监控点 - 消息处理速率(条/分钟) - 图片上传成功率 - 大模型API响应时间 - 线索转化率(有效线索/总消息)
八、部署与运维
8.1 环境要求
-
Python 3.8+ 与 Java 11+
-
Windows系统(支持微信客户端)
-
稳定的网络连接
-
阿里云OSS存储空间
8.2 配置文件
coze.txt - 大模型API密钥 oss_config.json - 云存储配置 filter_rules.json - 过滤规则配置
九、总结与展望
本系统成功实现了微信消息的自动化采集、智能处理和结构化输出,主要创新点包括:
-
稳定可靠:三级缓存+异常恢复机制确保7×24小时运行
-
智能识别:双模型协同实现精准信息提取
-
高效处理:批量处理+异步调用提升吞吐量
-
易于扩展:模块化设计支持功能快速迭代
十、源代码
个人微信消息获取、大模型自动整理、图片只能审核从0-1开发
1、消息抓取后的数据临时存储与恢复,数据抓取后会存储到txt文件,达到100条处理后清空,如果没达到会存储,保证数据不丢失。
2、调用微信后的获取的函数,首先会自动登录,然后设置自动获取图片,过滤掉文件和视频。
3、判断消息里是否包含图片,包含图片的做合并和去重处理,并且归属到时间距离最近的一条消息,然后再上传到阿里云。
4、最后把符合条件的消息发送给JAVA后端处理
5、JAVA后端代码,首先会拼接传递过来的信息,然后送给图片大模型判断图片是否为合法的车辆图片,最后再提交给豆包智能体进行规整。
# -*- coding: utf-8 -*-
import json
import re
import time
import concurrent.futures
import threading
import os
import traceback
import requests
from wxautox import WeChat
import a_step1
import a_step2
# 在全局作用域中定义这个列表
user_data_list = []
# 创建一个锁来保证线程安全的列表操作
list_lock = threading.Lock()
# 创建一个线程池执行器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 线程安全的文件写入锁
file_lock = threading.Lock()
# 备份文件名
BACKUP_FILE = 'z_back.txt'
# 记录文件名
RECORD_FILE = 'z_record.txt'
# 错误日志文件
ERROR_LOG_FILE = 'z_error.log'
def write_error_log(error_msg):
"""写入错误日志"""
try:
with file_lock:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
with open(ERROR_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] {error_msg}\n")
f.write(traceback.format_exc())
f.write("\n" + "=" * 80 + "\n")
except Exception as e:
print(f"写入错误日志失败: {str(e)}")
def send_http_request(user_data):
"""发送HTTP请求的独立函数!!!"""
url = "http://localhost:8080/big/insert_user"
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
response = requests.post(
url,
data=json.dumps(user_data),
headers=headers,
timeout=3600
)
if response.status_code == 200:
msg = "请求成功!"
else:
msg = f"请求失败,状态码: {response.status_code}"
print(msg)
except Exception as e:
error_msg = f"请求过程中发生错误: {str(e)}"
print(error_msg)
write_error_log(error_msg)
def write_to_record(content):
"""将内容写入记录文件"""
try:
with file_lock:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
with open(RECORD_FILE, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] {content}\n")
except Exception as e:
print(f"写入记录文件失败: {str(e)}")
def load_backup_data():
"""从备份文件加载数据"""
if os.path.exists(BACKUP_FILE):
try:
with open(BACKUP_FILE, 'r', encoding='utf-8') as f:
data = f.read().strip()
if data:
return json.loads(data)
except Exception as e:
print(f"加载备份文件失败: {str(e)}")
write_error_log(f"加载备份文件失败: {str(e)}")
return []
def save_backup_data(data):
"""保存数据到备份文件"""
try:
with file_lock:
with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存备份文件失败: {str(e)}")
write_error_log(f"保存备份文件失败: {str(e)}")
def contains_phone_number(text):
"""判断字符串中是否包含中国大陆手机号"""
if not text:
return False
pattern = r'(?<!\d)(1[3-9]\d{9})(?!\d)'
return bool(re.search(pattern, text))
def process_message(who, group_nickname, content):
"""处理单条消息的函数"""
try:
user_data = {
"who": who,
"sender": group_nickname,
"pythonInfo": content,
"time": time.time()
}
# 打印并保存原始消息
with list_lock:
if user_data["pythonInfo"] != "以下是新消息":
user_data_list.append(user_data)
print(f"当前消息数量: {len(user_data_list)}")
save_backup_data(user_data_list)
if len(user_data_list) >= 100:
print("开始处理批量消息..." + "=" * 80)
# 处理逻辑
finalDataList = a_step1.integrate_messages(user_data_list)
for item in finalDataList:
if "imgRul" in item and item["imgRul"]:
png_list = [png.strip() for png in item["imgRul"].split(",") if png.strip().endswith(".png")]
temp_str = ""
for temp in png_list:
try:
temp_res = a_step2.upload_to_oss(temp.replace("\\", "/"))
temp_str = temp_str + str(temp_res['url']) + ","
except Exception as e:
print(f"上传图片失败: {str(e)}")
write_error_log(f"上传图片失败: {str(e)}")
item["imgRul"] = temp_str
else:
item["imgRul"] = None
remaining_items = []
for item in finalDataList:
user_data = {
"who": item['who'],
"sender": item['sender'],
"pythonInfo": item['pythonInfo'],
"imgRul": item['imgRul'],
"time": item['time']
}
# 检查是否满足提交条件
has_phone = contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender'])
has_valid_img = item['imgRul'] and "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic" in \
item['imgRul']
if has_phone and has_valid_img:
print("提交符合条件的消息到服务器")
executor.submit(send_http_request, user_data)
else:
current_time = time.time()
time_diff = current_time - user_data["time"]
if has_phone and time_diff > 300:
print("时间差超过5分钟,提交文本消息")
executor.submit(send_http_request, user_data)
elif has_phone:
remaining_items.append(item)
# 处理完成后清空列表并更新备份文件
user_data_list.clear()
user_data_list.extend(remaining_items)
save_backup_data(user_data_list)
except Exception as e:
print(f"处理消息时发生错误: {str(e)}")
write_error_log(f"处理消息时发生错误: {str(e)}")
def init_wechat(max_retries=10, retry_interval=5):
"""初始化微信客户端,带有重试机制"""
for retry in range(max_retries):
try:
print(f"尝试初始化微信客户端 (第 {retry + 1} 次)...")
wx = WeChat()
print("微信客户端初始化成功")
return wx
except Exception as e:
print(f"初始化微信客户端失败 (第 {retry + 1} 次): {str(e)}")
write_error_log(f"初始化微信客户端失败: {str(e)}")
if retry < max_retries - 1:
time.sleep(retry_interval)
else:
raise
return None
def main_loop():
"""主循环函数,带有错误恢复机制"""
# 加载备份数据
backup_data = load_backup_data()
if backup_data:
with list_lock:
user_data_list.extend(backup_data)
print(f"从备份文件恢复了 {len(backup_data)} 条数据")
write_to_record(f"从备份文件恢复了 {len(backup_data)} 条数据")
wx = None
wait = 0.01
save_pic = True
save_video = False
save_file = False
save_voice = False
parse_url = False
consecutive_errors = 0
max_consecutive_errors = 10
while True:
try:
if wx is None:
wx = init_wechat()
consecutive_errors = 0 # 重置错误计数器
msgs = wx.GetNextNewMessage(save_pic, save_video, save_file, save_voice, parse_url)
if len(msgs) > 0:
for key, value in msgs.items():
who = key
for chat in msgs:
one_msgs = msgs.get(chat)
for msg in one_msgs:
content = msg.content
try:
group_nickname = msg.sender_remark
except AttributeError:
group_nickname = "车来客"
# 处理消息
process_message(who, group_nickname, content)
time.sleep(wait)
consecutive_errors = 0 # 成功执行后重置错误计数器
except LookupError as e:
consecutive_errors += 1
error_msg = f"控件查找失败 (连续错误 {consecutive_errors} 次): {str(e)}"
print(error_msg)
write_error_log(error_msg)
if consecutive_errors >= max_consecutive_errors:
print("连续错误过多,尝试重新初始化微信客户端...")
wx = None # 重置微信客户端
time.sleep(10) # 等待更长时间再重试
else:
time.sleep(1) # 短暂等待后重试
except KeyboardInterrupt:
print("程序被用户中断")
write_to_record("程序被用户中断")
break
except Exception as e:
consecutive_errors += 1
error_msg = f"未预期的错误 (连续错误 {consecutive_errors} 次): {str(e)}"
print(error_msg)
write_error_log(error_msg)
if consecutive_errors >= max_consecutive_errors:
print("连续错误过多,重启微信客户端...")
wx = None
time.sleep(10)
else:
time.sleep(1)
def cleanup():
"""清理函数"""
print("正在执行清理操作...")
# 保存当前数据
save_backup_data(user_data_list)
# 关闭线程池
executor.shutdown(wait=True)
print("清理完成,程序退出")
if __name__ == '__main__':
try:
main_loop()
finally:
cleanup()
import json
def integrate_messages(data):
# 分离图片消息和文本消息
image_msgs = []
text_msgs = []
for msg in data:
if '.png' in msg['pythonInfo']:
image_msgs.append(msg)
else:
text_msgs.append(msg)
# 对文本消息按时间排序
text_msgs_sorted = sorted(text_msgs, key=lambda x: x['time'])
# 对每个图片消息,找到时间上最近的文本消息
for img_msg in image_msgs:
# 找到时间差最小的文本消息
min_diff = float('inf')
closest_text_msg = None
for text_msg in text_msgs_sorted:
# 添加who和sender必须相等的条件
if text_msg['who'] == img_msg['who'] and text_msg['sender'] == img_msg['sender']:
time_diff = abs(text_msg['time'] - img_msg['time'])
if time_diff < min_diff:
min_diff = time_diff
closest_text_msg = text_msg
# 将图片URL添加到最近的文本消息中
if closest_text_msg:
if 'imgRul' not in closest_text_msg:
closest_text_msg['imgRul'] = img_msg['pythonInfo']
else:
# 如果已经有图片URL,用逗号分隔追加
closest_text_msg['imgRul'] = (closest_text_msg['imgRul'] or '') + ',' + img_msg['pythonInfo']
# 返回按时间排序的文本消息(现在包含整合的图片URL)
return sorted(text_msgs_sorted, key=lambda x: x['time'])
if __name__ == '__main__':
# 示例使用
user_data_list = []
finalDataList = integrate_messages(user_data_list)
import oss2
from datetime import datetime
import uuid
def upload_to_oss(image_path):
# 阿里云OSS配置
auth = oss2.Auth('LTAI5tMj54XU2uzSD18THyCv', 'KxNNrg7pymogv4PSOzS6kqhwcWjhe4')
bucket_name = 'chelaike'
endpoint = 'https://oss-cn-beijing.aliyuncs.com' # 根据你的bucket所在地域修改
# 创建Bucket实例
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 生成唯一的文件名
file_extension = image_path.split('.')[-1]
unique_filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{str(uuid.uuid4())[:8]}.{file_extension}"
oss_path = f"wx_clue_pic/{unique_filename}"
try:
# 上传文件
bucket.put_object_from_file(oss_path, image_path)
# 生成可访问的URL
url = f"https://{bucket_name}.{endpoint.replace('https://', '')}/{oss_path}"
return {
'status': 'success',
'url': url,
'message': '文件上传成功'
}
except Exception as e:
return {
'status': 'error',
'url': None,
'message': f'文件上传失败: {str(e)}'
}
# 使用示例
if __name__ == '__main__':
# 替换为你要上传的图片路径
image_path = 'C:/Users/xlliu24/Desktop/5.19号/IT01/pythonProject3/wxauto文件/微信图片_20250519143650731564.png'
result = upload_to_oss(image_path)
print(result['url'])
package com.black.controller;
import com.alibaba.fastjson.JSON;
import com.black.pojo.*;
import com.black.smart.HttpUpload;
import com.black.smart.SmartEntity_1;
import com.black.smart.SmartEntity_3;
import com.black.util.ReadConfigGson;
import com.black.util.Res;
import com.google.gson.Gson;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.UUID;
@RestController
@RequestMapping("/big")
public class BigController {
Gson gson = new Gson();
@PostMapping("/insert_user")
public Res insert_user(@RequestBody User user) throws Exception {
// 原始的Python推送信息如下:
String originPythonInfo = "群名称:" + user.getWho() + "。发送者:" + user.getSender() + "。\n";
originPythonInfo = originPythonInfo + "发送信息如下:\n" + user.getPythonInfo();
// 要先判断imgUrl的合法性
if (user.getImgRul() != null) { // 这个需要进行二次判断
String imgUrlArray[] = user.getImgRul().split(",");
StringBuilder tempTotalUrl = new StringBuilder();
if (imgUrlArray.length > 0) {
for (String tempImgUrl : imgUrlArray) {
String imageModelRes = SmartEntity_3.SmartWork(UUID.randomUUID().toString(), tempImgUrl);
ImageModelResJson imageModelResJson = gson.fromJson(imageModelRes, ImageModelResJson.class);
if (imageModelResJson.aCarOrADrivingLicense.equals("是")) {
tempTotalUrl.append(tempImgUrl).append(",");
}
}
}
if (tempTotalUrl.length() <= 0) {
System.out.println("过滤后没有图片URL,无需上传给趣车...");
return Res.success();
} else {
originPythonInfo = originPythonInfo + "发送信息如下:\n" + user.getPythonInfo() + "。imgRul的值为:" + tempTotalUrl;
String uploadInfo = SmartEntity_1.SmartWork(UUID.randomUUID().toString(), originPythonInfo);
System.err.println("最终请求趣车出口数据:" + JSON.toJSON(uploadInfo));
// 拿到全部信息,请求智能体拿到JSON结果
ReadConfigGson readConfigGson = ReadConfigGson.work("coze.txt"); // 从配置文件读取token
String url = readConfigGson.getHost();
// 这里执行最终的上传!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
String res = HttpUpload.sendPostRequest(url, JSON.toJSON(uploadInfo).toString());
System.err.println("上传结果:" + res);
}
} else {
System.out.println("没有图片URL,无需上传给趣车...");
}
return Res.success();
}
static class ImageModelResJson {
String aCarOrADrivingLicense;
}
}
package com.black.smart;
import com.black.util.ReadConfigGson;
import okhttp3.*;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SmartEntity_1 {
public static void main(String[] args) {
// 在main函数中直接调用封装的方法
Long startTime = System.currentTimeMillis();
String chatMessage = "新疆 喀什";
String result = SmartWork(UUID.randomUUID().toString(), chatMessage);
System.out.println("最终结果: " + result);
Long endTime = System.currentTimeMillis();
System.out.println("运行耗时" + (endTime - startTime) + "ms");
}
/**
* 调用Coze API的静态方法
*
* @param userId 用户ID
* @return API返回的解析结果
*/
public static String SmartWork(String userId, String chatMessage) {
// API 配置
String url = "https://api.coze.cn/v3/chat?";
ReadConfigGson readConfigGson = ReadConfigGson.work("coze.txt"); // 从配置文件读取token
String token = readConfigGson.getToken();
String botId = "7603685193958178826";
try {
// 创建 HTTP 客户端
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS) // 连接超时
.readTimeout(60, TimeUnit.SECONDS) // 读取超时
.writeTimeout(60, TimeUnit.SECONDS) // 写入超时
.callTimeout(60, TimeUnit.SECONDS) // 整个调用超时(OkHttp 4.0+)
.build();
// 构建请求体 JSON
JSONObject requestBody = new JSONObject();
requestBody.put("bot_id", botId);
requestBody.put("user_id", userId);
requestBody.put("stream", true);
// 构建 additional_messages 数组
JSONArray additionalMessages = new JSONArray();
// 遍历 infoList 添加历史消息
JSONObject message = new JSONObject();
message.put("role", "user"); // 假设 Info 类有 getRole() 方法
message.put("type", "question"); // 假设 Info 类有 getType() 方法
message.put("content_type", "text");
message.put("content", chatMessage); // 过滤掉数据库大模型答案额外的标记
additionalMessages.put(message);
requestBody.put("additional_messages", additionalMessages);
requestBody.put("parameters", new JSONObject());
System.err.println("请求体: " + requestBody.toString());
// 创建请求体
RequestBody body = RequestBody.create(MediaType.parse("application/json"), requestBody.toString());
// 构建请求
Request request = new Request.Builder().url(url).post(body).addHeader("Authorization", "Bearer " + token).addHeader("Content-Type", "application/json").build();
// 发送请求并处理响应
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
// System.out.println("原始响应: " + responseBody);
String result = parseResponseBody(responseBody);
return result;
} else {
System.out.println("请求失败: " + response.code() + " - " + response.message());
return "请求失败: " + response.code() + " - " + response.message();
}
}
} catch (IOException e) {
e.printStackTrace();
return "请求异常: " + e.getMessage();
}
}
public static String parseResponseBody(String responseBody) {
// System.out.println("=== 开始解析响应 ===");
// 按行分割响应内容
String[] lines = responseBody.split("\n");
String currentEvent = "";
String currentData = "";
boolean isCollectingData = false;
StringBuilder dataBuilder = new StringBuilder();
StringBuilder resultBuilder = new StringBuilder();
for (String line : lines) {
line = line.trim();
// System.out.println("处理行: " + line);
if (line.startsWith("event:")) {
// 保存之前收集的完整事件数据
if (isCollectingData && !currentData.isEmpty()) {
// System.out.println("处理完整事件: " + currentEvent);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
}
// 开始新的事件
currentEvent = line.substring(6).trim();
currentData = "";
dataBuilder = new StringBuilder();
isCollectingData = false;
// System.out.println("新事件类型: " + currentEvent);
} else if (line.startsWith("data:")) {
String dataLine = line.substring(5).trim();
dataBuilder.append(dataLine);
isCollectingData = true;
// System.out.println("收集数据: " + dataLine);
} else if (line.isEmpty() && isCollectingData) {
// 空行表示一个完整的事件数据结束
currentData = dataBuilder.toString();
// System.out.println("完整数据: " + currentData);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
// 重置状态
currentEvent = "";
currentData = "";
dataBuilder = new StringBuilder();
isCollectingData = false;
} else if (isCollectingData && !line.isEmpty()) {
// 处理多行数据的情况
dataBuilder.append(line);
// System.out.println("继续收集数据: " + line);
}
}
// 处理最后一个事件(如果没有以空行结束)
if (isCollectingData && !currentData.isEmpty()) {
// System.out.println("处理最后事件: " + currentEvent);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
}
System.out.println("=== 解析完成 ===");
return resultBuilder.toString();
}
private static String processCompletedEvent(String event, String data) {
// System.out.println("处理事件: " + event + ", 数据长度: " + data.length());
if ("conversation.message.completed".equals(event) && !data.isEmpty()) {
try {
// System.out.println("尝试解析数据: " + data);
// 使用 JSON 解析代替正则表达式
JSONObject jsonData = new JSONObject(data);
// 检查是否是 answer 类型的消息
if (jsonData.has("type") && "answer".equals(jsonData.getString("type"))) {
// System.out.println("找到answer类型消息");
if (jsonData.has("content")) {
String content = jsonData.getString("content");
// System.out.println("原始content: " + content);
// 处理可能的转义字符
String unescapedContent = unescapeJsonString(content);
// System.out.println("处理后content: " + unescapedContent);
return unescapedContent;
} else {
// System.out.println("JSON中未找到content字段");
}
} else {
// System.out.println("不是answer类型消息,类型为: " + jsonData.optString("type", "未知"));
}
} catch (Exception e) {
System.err.println("解析 content 字段时出错: " + e.getMessage());
System.err.println("出错的数据: " + data);
e.printStackTrace();
// 如果JSON解析失败,尝试使用正则表达式作为备选方案
System.out.println("尝试使用正则表达式作为备选方案");
try {
Pattern pattern = Pattern.compile("\"content\":\"(.*?)\"", Pattern.DOTALL);
Matcher matcher = pattern.matcher(data);
if (matcher.find()) {
String content = matcher.group(1);
System.out.println("正则匹配到的content: " + content);
return unescapeJsonString(content);
}
} catch (Exception ex) {
System.err.println("正则表达式解析也失败: " + ex.getMessage());
}
}
} else {
// System.out.println("不处理的事件类型: " + event);
}
return "";
}
// 处理 JSON 字符串中的转义字符
private static String unescapeJsonString(String str) {
if (str == null) return "";
String unescaped = str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t").replace("\\\"", "\"").replace("\\\\", "\\");
// System.out.println("转义前: " + str);
// System.out.println("转义后: " + unescaped);
return unescaped;
}
}
package com.black.smart;
import com.black.util.ReadConfigGson;
import okhttp3.*;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SmartEntity_3 {
public static void main(String[] args) {
// 在main函数中直接调用封装的方法
String result = SmartWork("用户517623533241", "https://chelaike.oss-cn-beijing.aliyuncs.com/media/uJAdnqUlFQyzDDMNsECHMgpXywTHxi0d.png");
System.out.println("最终结果: " + result);
}
/**
* APP分析的智能体
*/
public static String SmartWork(String userId, String chatMessage) {
// API 配置
String url = "https://api.coze.cn/v3/chat?";
ReadConfigGson readConfigGson = ReadConfigGson.work("coze.txt");
String token = readConfigGson.getToken();
// 判断图片URL是否为车辆图片接口
String botId = "7600231789953122367"; //
try {
// 创建 HTTP 客户端
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS) // 连接超时
.readTimeout(60, TimeUnit.SECONDS) // 读取超时
.writeTimeout(60, TimeUnit.SECONDS) // 写入超时
.callTimeout(60, TimeUnit.SECONDS) // 整个调用超时(OkHttp 4.0+)
.build();
// 构建请求体 JSON
JSONObject requestBody = new JSONObject();
requestBody.put("bot_id", botId);
requestBody.put("user_id", userId);
requestBody.put("stream", true);
// 构建 additional_messages 数组
JSONArray additionalMessages = new JSONArray();
// 遍历 infoList 添加历史消息
JSONObject message = new JSONObject();
message.put("role", "user"); // 假设 Info 类有 getRole() 方法
message.put("type", "question"); // 假设 Info 类有 getType() 方法
message.put("content_type", "text");
message.put("content", chatMessage); // 过滤掉数据库大模型答案额外的标记
additionalMessages.put(message);
requestBody.put("additional_messages", additionalMessages);
requestBody.put("parameters", new JSONObject());
System.err.println("请求体: " + requestBody.toString());
// 创建请求体
RequestBody body = RequestBody.create(MediaType.parse("application/json"), requestBody.toString());
// 构建请求
Request request = new Request.Builder().url(url).post(body).addHeader("Authorization", "Bearer " + token).addHeader("Content-Type", "application/json").build();
// 发送请求并处理响应
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
// System.out.println("原始响应: " + responseBody);
String result = parseResponseBody(responseBody);
return result;
} else {
System.out.println("请求失败: " + response.code() + " - " + response.message());
return "请求失败: " + response.code() + " - " + response.message();
}
}
} catch (IOException e) {
e.printStackTrace();
return "请求异常: " + e.getMessage();
}
}
public static String parseResponseBody(String responseBody) {
// System.out.println("=== 开始解析响应 ===");
// 按行分割响应内容
String[] lines = responseBody.split("\n");
String currentEvent = "";
String currentData = "";
boolean isCollectingData = false;
StringBuilder dataBuilder = new StringBuilder();
StringBuilder resultBuilder = new StringBuilder();
for (String line : lines) {
line = line.trim();
// System.out.println("处理行: " + line);
if (line.startsWith("event:")) {
// 保存之前收集的完整事件数据
if (isCollectingData && !currentData.isEmpty()) {
// System.out.println("处理完整事件: " + currentEvent);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
}
// 开始新的事件
currentEvent = line.substring(6).trim();
currentData = "";
dataBuilder = new StringBuilder();
isCollectingData = false;
// System.out.println("新事件类型: " + currentEvent);
} else if (line.startsWith("data:")) {
String dataLine = line.substring(5).trim();
dataBuilder.append(dataLine);
isCollectingData = true;
// System.out.println("收集数据: " + dataLine);
} else if (line.isEmpty() && isCollectingData) {
// 空行表示一个完整的事件数据结束
currentData = dataBuilder.toString();
// System.out.println("完整数据: " + currentData);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
// 重置状态
currentEvent = "";
currentData = "";
dataBuilder = new StringBuilder();
isCollectingData = false;
} else if (isCollectingData && !line.isEmpty()) {
// 处理多行数据的情况
dataBuilder.append(line);
// System.out.println("继续收集数据: " + line);
}
}
// 处理最后一个事件(如果没有以空行结束)
if (isCollectingData && !currentData.isEmpty()) {
// System.out.println("处理最后事件: " + currentEvent);
String content = processCompletedEvent(currentEvent, currentData);
if (content != null && !content.isEmpty()) {
resultBuilder.append(content);
}
}
System.out.println("=== 解析完成 ===");
return resultBuilder.toString();
}
private static String processCompletedEvent(String event, String data) {
// System.out.println("处理事件: " + event + ", 数据长度: " + data.length());
if ("conversation.message.completed".equals(event) && !data.isEmpty()) {
try {
// System.out.println("尝试解析数据: " + data);
// 使用 JSON 解析代替正则表达式
JSONObject jsonData = new JSONObject(data);
// 检查是否是 answer 类型的消息
if (jsonData.has("type") && "answer".equals(jsonData.getString("type"))) {
// System.out.println("找到answer类型消息");
if (jsonData.has("content")) {
String content = jsonData.getString("content");
// System.out.println("原始content: " + content);
// 处理可能的转义字符
String unescapedContent = unescapeJsonString(content);
// System.out.println("处理后content: " + unescapedContent);
return unescapedContent;
} else {
// System.out.println("JSON中未找到content字段");
}
} else {
// System.out.println("不是answer类型消息,类型为: " + jsonData.optString("type", "未知"));
}
} catch (Exception e) {
System.err.println("解析 content 字段时出错: " + e.getMessage());
System.err.println("出错的数据: " + data);
e.printStackTrace();
// 如果JSON解析失败,尝试使用正则表达式作为备选方案
System.out.println("尝试使用正则表达式作为备选方案");
try {
Pattern pattern = Pattern.compile("\"content\":\"(.*?)\"", Pattern.DOTALL);
Matcher matcher = pattern.matcher(data);
if (matcher.find()) {
String content = matcher.group(1);
System.out.println("正则匹配到的content: " + content);
return unescapeJsonString(content);
}
} catch (Exception ex) {
System.err.println("正则表达式解析也失败: " + ex.getMessage());
}
}
} else {
// System.out.println("不处理的事件类型: " + event);
}
return "";
}
// 处理 JSON 字符串中的转义字符
private static String unescapeJsonString(String str) {
if (str == null) return "";
String unescaped = str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t").replace("\\\"", "\"").replace("\\\\", "\\");
System.out.println("转义前: " + str);
System.out.println("转义后: " + unescaped);
return unescaped;
}
}
更多推荐


所有评论(0)