构建智能对话系统:Python实现聊天话题管理与摘要生成
对话记录管理:使用SQLite数据库持久化存储聊天记录话题自动识别:将相关对话内容聚类成话题智能摘要生成:对每个话题生成简洁的摘要多智能体协作:使用多个AI智能体分工处理不同任务下面我们来详细解析系统的各个组成部分。本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构
在当今信息爆炸的时代,如何有效管理和组织对话记录成为了一个重要挑战。本文将介绍如何使用Python构建一个智能对话系统,该系统能够自动识别对话话题、生成摘要,并提供智能对话管理功能。
系统概述
这个智能对话系统的核心功能包括:
- 对话记录管理:使用SQLite数据库持久化存储聊天记录
- 话题自动识别:将相关对话内容聚类成话题
- 智能摘要生成:对每个话题生成简洁的摘要
- 多智能体协作:使用多个AI智能体分工处理不同任务
下面我们来详细解析系统的各个组成部分。
环境准备与依赖安装
首先,确保你已安装Python 3.7+,然后安装必要的依赖包:
pip install langchain langchain-community langchain-openai
如果你的OpenAI模型是通过LM Studio本地部署的,还需要配置相应的API端点。
项目结构设计
合理的项目结构是代码可维护性的基础,我们的项目结构如下:
chat_system/
├── chat_manager.py # 主程序文件
├── requirements.txt # 依赖列表
└── chat_topics.db # SQLite数据库(自动生成)
核心代码解析
1. 数据模型设计
我们首先定义两个核心数据模型:话题(Topic)和聊天消息(ChatMessage)。
class Topic:
def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
self.topic_id = topic_id
self.topic = topic
self.summary = summary
self.message_ids = message_ids
class ChatMessage:
def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
self.message_id = message_id
self.role = role
self.content = content
self.timestamp = timestamp or datetime.now()
2. 数据库管理
我们使用SQLite数据库进行数据持久化存储,通过DatabaseManager类统一管理数据库操作。
class DatabaseManager:
def __init__(self, db_path: str = "chat_topics.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建topics表
cursor.execute("""
CREATE TABLE IF NOT EXISTS topics (
topic_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
summary TEXT,
message_ids TEXT
)
""")
# 创建chat_messages表
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
3. 核心功能工具
系统提供了三个核心工具函数,分别负责话题选择、历史记录获取和话题压缩。
def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
"""根据topic_ids选择话题的工具"""
# 实现细节...
def get_history(session_id: str) -> List[Dict[str, Any]]:
"""获取当前会话的聊天记录"""
# 实现细节...
def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""将聊天记录压缩成话题"""
# 实现细节...
4. 多智能体系统
系统采用多智能体架构,每个智能体负责不同的任务:
- 历史信息检索智能体:负责检索相关的历史话题
- 问答智能体:基于上下文信息回答用户问题
- 话题总结智能体:将对话记录压缩成结构化话题
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
agent=history_retrieval_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
# 类似创建其他智能体...
5. 主协调函数
super_agent函数作为系统的主入口,协调各个智能体协作处理用户输入。
def super_agent(user_input: str, session_id: str):
"""超级智能体主函数"""
if not session_id:
session_id = str(uuid.uuid4())
# 保存用户输入
user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
db_manager.save_chat_message(session_id, user_message)
# 1. 历史信息检索智能体
print("=== 历史信息检索智能体 ===")
# ... 检索相关话题
# 2. 使用检测信息回答用户问题智能体
print("=== 问答智能体 ===")
# ... 生成回答
# 3. 处理历史信息 话题总结智能体
print("=== 话题总结智能体 ===")
# ... 压缩对话为话题
return {
"session_id": session_id,
"qa_response": qa_response,
"new_topics": new_topic
}
系统特色与优势
1. 模块化设计
系统采用高度模块化的设计,各个组件职责分明,便于维护和扩展。
2. 智能话题识别
系统能够自动识别对话中的主题,并将相关对话内容聚类,大大提高了对话记录的可管理性。
3. 灵活的数据持久化
使用SQLite数据库,既保证了数据的持久化,又避免了复杂数据库的部署成本。
4. 多智能体协作
通过多个专门化的智能体分工合作,提高了系统的整体性能和准确性。
实际应用示例
下面是一个简单的使用示例:
if __name__ == "__main__":
result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
print(f"最终结果: {result}")
系统会自动处理用户输入,检索相关历史话题,生成回答,并将当前对话压缩成新话题。
扩展方向
这个基础系统有很多可能的扩展方向:
- 用户认证系统:添加用户登录和权限管理
- 话题可视化:提供图形化界面展示话题关系
- 高级摘要算法:使用更先进的NLP技术生成质量更高的摘要
- 多语言支持:扩展系统以支持多种语言
- 实时协作:支持多用户同时使用和协作
总结
本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构,系统能够高效地处理用户查询并管理对话历史。
这种系统可以广泛应用于客服系统、个人知识管理、团队协作等多种场景,帮助用户更好地组织和利用对话信息。希望本文能为你在构建智能对话系统方面提供有益的参考和启发。
提示:在实际部署时,建议添加错误处理、日志记录和性能监控等生产环境需要的功能,以确保系统的稳定性和可维护性。
import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool
# 为避免弃用警告,使用条件导入
try:
from langchain_openai import ChatOpenAI
except ImportError:
from langchain_community.chat_models import ChatOpenAI
# 配置连接到LM Studio的LLM
llm = ChatOpenAI(
model="qwen/qwen3-1.7b",
openai_api_key="lm-studio",
openai_api_base="http://127.0.0.1:1234/v1",
temperature=0.7
)
class Topic:
def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
self.topic_id = topic_id
self.topic = topic
self.summary = summary
self.message_ids = message_ids
class ChatMessage:
def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
self.message_id = message_id
self.role = role
self.content = content
self.timestamp = timestamp or datetime.now()
class DatabaseManager:
def __init__(self, db_path: str = "chat_topics.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建topics表
cursor.execute("""
CREATE TABLE IF NOT EXISTS topics (
topic_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
summary TEXT,
message_ids TEXT
)
""")
# 创建chat_messages表
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def save_topic(self, topic: Topic):
"""保存话题"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
)
conn.commit()
conn.close()
def get_all_topics(self) -> List[Topic]:
"""获取所有话题"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
rows = cursor.fetchall()
conn.close()
topics = []
for row in rows:
topic_id, topic_str, summary, message_ids_str = row
message_ids = json.loads(message_ids_str) if message_ids_str else []
topics.append(Topic(topic_id, topic_str, summary, message_ids))
return topics
def save_chat_message(self, session_id: str, message: ChatMessage):
"""保存聊天消息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 使用INSERT OR IGNORE避免重复插入
cursor.execute(
"INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
(message.message_id, session_id, message.role, message.content)
)
conn.commit()
conn.close()
def get_session_messages(self, session_id: str) -> List[ChatMessage]:
"""获取会话消息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
(session_id,)
)
rows = cursor.fetchall()
conn.close()
messages = []
for row in rows:
message_id, role, content, timestamp = row
# 处理时间戳格式
if isinstance(timestamp, str):
try:
timestamp = datetime.fromisoformat(timestamp)
except ValueError:
timestamp = datetime.now()
messages.append(ChatMessage(message_id, role, content, timestamp))
return messages
# 创建数据库管理器实例
db_manager = DatabaseManager()
def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
topics = db_manager.get_all_topics()
# 如果topic_ids为空,处理所有话题
if not topic_ids:
selected_topics = topics
# 如果summary_only是布尔值,则应用于所有话题
if isinstance(summary_only, bool):
summary_flags = [summary_only] * len(selected_topics)
# 如果summary_only是列表,则按顺序应用
elif isinstance(summary_only, list):
summary_flags = summary_only
else:
summary_flags = [False] * len(selected_topics)
else:
# 根据指定的topic_ids筛选话题
selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
# 如果summary_only是布尔值,则应用于所有选定话题
if isinstance(summary_only, bool):
summary_flags = [summary_only] * len(selected_topics)
# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
elif isinstance(summary_only, list):
summary_flags = summary_only[:len(selected_topics)]
# 如果summary_only列表长度不足,用False填充
summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
else:
summary_flags = [False] * len(selected_topics)
result = []
for i, topic in enumerate(selected_topics):
# 获取对应的summary_only标志
show_summary_only = summary_flags[i] if i < len(summary_flags) else False
if show_summary_only:
result.append({
"topic_id": topic.topic_id,
"topic": topic.topic,
"summary": topic.summary
})
else:
result.append({
"topic_id": topic.topic_id,
"topic": topic.topic,
"summary": topic.summary,
"message_ids": topic.message_ids
})
return result
def get_history(session_id: str) -> List[Dict[str, Any]]:
"""获取当前会话的聊天记录"""
messages = db_manager.get_session_messages(session_id)
return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]
def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""将新的聊天记录压缩成话题"""
# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
if not new_messages:
return []
# 创建新话题
topic_id = str(uuid.uuid4())
topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
# 保存消息到数据库
for msg in new_messages:
if 'message_id' not in msg:
msg['message_id'] = str(uuid.uuid4())
chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
db_manager.save_chat_message(session_id, chat_msg)
# 创建话题对象
topic = Topic(
topic_id=topic_id,
topic=topic_content,
summary=f"包含{len(new_messages)}条消息的话题",
message_ids=message_ids
)
# 保存话题到数据库
db_manager.save_topic(topic)
# 返回话题信息
return [{
"topic_id": topic.topic_id,
"topic": topic.topic,
"summary": topic.summary,
"message_ids": topic.message_ids
}]
# 定义工具列表
tools = [
Tool.from_function(
func=get_history,
name="get_history",
description="获取指定会话的聊天历史记录"
),
Tool.from_function(
func=select_topic_tool,
name="select_topic_tool",
description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
),
Tool.from_function(
func=compress_history_to_topic,
name="compress_history_to_topic",
description="将聊天记录压缩成话题"
)
]
# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}"""
prompt = PromptTemplate.from_template(template)
# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
agent=history_retrieval_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
qa_agent = AgentExecutor(
agent=qa_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
topic_summary_agent = AgentExecutor(
agent=topic_summary_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
def super_agent(user_input: str, session_id: str):
"""超级智能体主函数"""
if not session_id:
session_id = str(uuid.uuid4())
# 保存用户输入
user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
db_manager.save_chat_message(session_id, user_message)
# 1. 历史信息检索智能体
print("=== 历史信息检索智能体 ===")
# 检索相关话题
all_topics = db_manager.get_all_topics()
topic_ids = [topic.topic_id for topic in all_topics]
if topic_ids:
# 为每个topic_id指定不同的summary_only值
summary_flags = [True, False] * (len(topic_ids) // 2 + 1)
summary_flags = summary_flags[:len(topic_ids)]
related_topics = select_topic_tool(topic_ids, summary_flags)
print(f"相关话题: {related_topics}")
else:
print("未找到相关话题")
related_topics = []
# 2. 使用检测信息回答用户问题智能体
print("=== 问答智能体 ===")
history = get_history(session_id)
print(f"会话历史: {history}")
# 构造问答上下文
qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
try:
qa_response = qa_agent.invoke({
"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}",
"tools": tools,
"tool_names": ", ".join([t.name for t in tools])
})
print(f"问答结果: {qa_response}")
except Exception as e:
print(f"问答智能体执行出错: {e}")
qa_response = {"output": "抱歉,我无法生成回答。"}
# 3. 处理历史信息 话题总结智能体
print("=== 话题总结智能体 ===")
# 将当前会话消息压缩成话题
session_messages = get_history(session_id)
if session_messages:
new_topic = compress_history_to_topic(session_id, session_messages)
print(f"新话题: {new_topic}")
else:
print("没有新消息需要压缩成话题")
new_topic = []
return {
"session_id": session_id,
"qa_response": qa_response.get("output", qa_response.get("result", "无回答")),
"new_topics": new_topic
}
# 示例使用
if __name__ == "__main__":
result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
print(f"最终结果: {result}")
并行
import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool
# 为避免弃用警告,使用条件导入
try:
from langchain_openai import ChatOpenAI
except ImportError:
from langchain_community.chat_models import ChatOpenAI
# 配置连接到LM Studio的LLM
llm = ChatOpenAI(
model="qwen/qwen3-1.7b",
openai_api_key="lm-studio",
openai_api_base="http://127.0.0.1:1234/v1",
temperature=0.7
)
class Topic:
def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
self.topic_id = topic_id
self.topic = topic
self.summary = summary
self.message_ids = message_ids
class ChatMessage:
def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
self.message_id = message_id
self.role = role
self.content = content
self.timestamp = timestamp or datetime.now()
class DatabaseManager:
def __init__(self, db_path: str = "chat_topics.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建topics表
cursor.execute("""
CREATE TABLE IF NOT EXISTS topics (
topic_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
summary TEXT,
message_ids TEXT
)
""")
# 创建chat_messages表
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def save_topic(self, topic: Topic):
"""保存话题"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
)
conn.commit()
conn.close()
def get_all_topics(self) -> List[Topic]:
"""获取所有话题"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
rows = cursor.fetchall()
conn.close()
topics = []
for row in rows:
topic_id, topic_str, summary, message_ids_str = row
message_ids = json.loads(message_ids_str) if message_ids_str else []
topics.append(Topic(topic_id, topic_str, summary, message_ids))
return topics
def save_chat_message(self, session_id: str, message: ChatMessage):
"""保存聊天消息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 使用INSERT OR IGNORE避免重复插入
cursor.execute(
"INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
(message.message_id, session_id, message.role, message.content)
)
conn.commit()
conn.close()
def get_session_messages(self, session_id: str) -> List[ChatMessage]:
"""获取会话消息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
(session_id,)
)
rows = cursor.fetchall()
conn.close()
messages = []
for row in rows:
message_id, role, content, timestamp = row
# 处理时间戳格式
if isinstance(timestamp, str):
try:
timestamp = datetime.fromisoformat(timestamp)
except ValueError:
timestamp = datetime.now()
messages.append(ChatMessage(message_id, role, content, timestamp))
return messages
# 创建数据库管理器实例
db_manager = DatabaseManager()
def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
try:
topics = db_manager.get_all_topics()
# 如果topic_ids为空,处理所有话题
if not topic_ids:
selected_topics = topics
# 如果summary_only是布尔值,则应用于所有话题
if isinstance(summary_only, bool):
summary_flags = [summary_only] * len(selected_topics)
# 如果summary_only是列表,则按顺序应用
elif isinstance(summary_only, list):
summary_flags = summary_only
else:
summary_flags = [False] * len(selected_topics)
else:
# 根据指定的topic_ids筛选话题
selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
# 如果summary_only是布尔值,则应用于所有选定话题
if isinstance(summary_only, bool):
summary_flags = [summary_only] * len(selected_topics)
# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
elif isinstance(summary_only, list):
summary_flags = summary_only[:len(selected_topics)]
# 如果summary_only列表长度不足,用False填充
summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
else:
summary_flags = [False] * len(selected_topics)
result = []
for i, topic in enumerate(selected_topics):
# 获取对应的summary_only标志
show_summary_only = summary_flags[i] if i < len(summary_flags) else False
if show_summary_only:
result.append({
"topic_id": topic.topic_id,
"topic": topic.topic,
"summary": topic.summary
})
else:
result.append({
"topic_id": topic.topic_id,
"topic": topic.topic,
"summary": topic.summary,
"message_ids": topic.message_ids
})
return result
except Exception as e:
print(f"select_topic_tool执行出错: {e}")
return []
def get_history(session_id: str) -> List[Dict[str, Any]]:
"""获取当前会话的聊天记录"""
try:
messages = db_manager.get_session_messages(session_id)
return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]
except Exception as e:
print(f"get_history执行出错: {e}")
return []
def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""将新的聊天记录压缩成话题,并合并相似话题"""
try:
# 如果没有提供new_messages,则从数据库获取会话消息
if new_messages is None:
new_messages = get_history(session_id)
# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
if not new_messages:
return []
# 创建新话题
topic_id = str(uuid.uuid4())
topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
# 保存消息到数据库
for msg in new_messages:
if 'message_id' not in msg:
msg['message_id'] = str(uuid.uuid4())
chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
db_manager.save_chat_message(session_id, chat_msg)
# 创建话题对象
new_topic = Topic(
topic_id=topic_id,
topic=topic_content,
summary=f"包含{len(new_messages)}条消息的话题",
message_ids=message_ids
)
# 检查是否已存在相似话题
all_topics = db_manager.get_all_topics()
merged = False
for existing_topic in all_topics:
# 简单的相似性检查:比较话题标题的前几个字符
if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:
# 合并到现有话题
existing_topic.message_ids.extend(message_ids)
existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"
db_manager.save_topic(existing_topic)
merged = True
break
# 如果没有合并,则保存新话题
if not merged:
db_manager.save_topic(new_topic)
# 返回话题信息
return [{
"topic_id": new_topic.topic_id,
"topic": new_topic.topic,
"summary": new_topic.summary,
"message_ids": new_topic.message_ids
}]
except Exception as e:
print(f"compress_history_to_topic执行出错: {e}")
return []
# 定义工具列表
tools = [
Tool.from_function(
func=get_history,
name="get_history",
description="获取指定会话的聊天历史记录"
),
Tool.from_function(
func=select_topic_tool,
name="select_topic_tool",
description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
),
Tool.from_function(
func=lambda session_id: compress_history_to_topic(session_id),
name="compress_history_to_topic",
description="将聊天记录压缩成话题"
)
]
# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought: {agent_scratchpad}"""
prompt = PromptTemplate.from_template(template)
# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
agent=history_retrieval_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
qa_agent = AgentExecutor(
agent=qa_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
topic_summary_agent = AgentExecutor(
agent=topic_summary_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
def super_agent(user_input: str, session_id: str):
"""超级智能体主函数"""
if not session_id:
session_id = str(uuid.uuid4())
# 保存用户输入
user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
db_manager.save_chat_message(session_id, user_message)
# 1. 历史信息检索智能体(并行搜索)
print("=== 历史信息检索智能体(并行搜索) ===")
# 检索相关话题
all_topics = db_manager.get_all_topics()
topic_ids = [topic.topic_id for topic in all_topics]
related_topics = []
if topic_ids:
# 将话题ID分组,每组10个
topic_groups = [topic_ids[i:i+10] for i in range(0, len(topic_ids), 10)]
# 并行执行多个搜索任务,每组话题作为一个任务
with ThreadPoolExecutor(max_workers=min(5, len(topic_groups))) as executor:
# 提交多个任务
future_to_topic = {}
# 为每组话题创建一个任务
for i, group in enumerate(topic_groups):
# 交替使用摘要和完整信息模式
summary_mode = (i % 2 == 0) # 偶数索引组使用摘要模式
future = executor.submit(select_topic_tool, group, summary_mode)
future_to_topic[future] = f"话题组{i}搜索({'摘要' if summary_mode else '完整'})"
# 任务: 获取会话历史
future3 = executor.submit(get_history, session_id)
future_to_topic[future3] = "会话历史"
# 收集并行任务结果
search_results = {}
for future in as_completed(future_to_topic):
task_name = future_to_topic[future]
try:
result = future.result()
search_results[task_name] = result
print(f"{task_name} 完成,结果数: {len(result) if isinstance(result, list) else 'N/A'}")
except Exception as exc:
print(f'{task_name} 执行时发生异常: {exc}')
search_results[task_name] = []
# 合并所有话题搜索结果
for task_name, result in search_results.items():
if "话题组" in task_name and isinstance(result, list):
related_topics.extend(result)
print(f"合并后相关话题数: {len(related_topics)}")
else:
print("未找到相关话题")
related_topics = []
# 2. 使用检测信息回答用户问题智能体
print("=== 问答智能体 ===")
history = get_history(session_id)
print(f"会话历史: {history}")
# 构造问答上下文
qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
try:
qa_response = qa_agent.invoke({
"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"
})
print(f"问答结果: {qa_response}")
except Exception as e:
print(f"问答智能体执行出错: {e}")
qa_response = {"output": "抱歉,我无法生成回答。"}
# 3. 处理历史信息 话题总结智能体
print("=== 话题总结智能体 ===")
# 将当前会话消息压缩成话题
new_topic = compress_history_to_topic(session_id)
if new_topic:
print(f"新话题: {new_topic}")
else:
print("没有新消息需要压缩成话题")
# 确保qa_response不为None并且有合适的键
if qa_response is None:
qa_response = {"output": "抱歉,我无法生成回答。"}
# 处理可能缺少output或result键的情况
if "output" in qa_response:
qa_result = qa_response["output"]
elif "result" in qa_response:
qa_result = qa_response["result"]
else:
qa_result = "无回答"
return {
"session_id": session_id,
"qa_response": qa_result,
"new_topics": new_topic
}
# 示例使用
if __name__ == "__main__":
result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
print(f"最终结果: {result}")
更多推荐


所有评论(0)