在当今信息爆炸的时代,如何有效管理和组织对话记录成为了一个重要挑战。本文将介绍如何使用Python构建一个智能对话系统,该系统能够自动识别对话话题、生成摘要,并提供智能对话管理功能。

系统概述

这个智能对话系统的核心功能包括:

  • 对话记录管理:使用SQLite数据库持久化存储聊天记录
  • 话题自动识别:将相关对话内容聚类成话题
  • 智能摘要生成:对每个话题生成简洁的摘要
  • 多智能体协作:使用多个AI智能体分工处理不同任务

下面我们来详细解析系统的各个组成部分。

环境准备与依赖安装

首先,确保你已安装Python 3.7+,然后安装必要的依赖包:

pip install langchain langchain-community langchain-openai

如果你的OpenAI模型是通过LM Studio本地部署的,还需要配置相应的API端点。

项目结构设计

合理的项目结构是代码可维护性的基础,我们的项目结构如下:

chat_system/
├── chat_manager.py     # 主程序文件
├── requirements.txt    # 依赖列表
└── chat_topics.db     # SQLite数据库(自动生成)

核心代码解析

1. 数据模型设计

我们首先定义两个核心数据模型:话题(Topic)和聊天消息(ChatMessage)。

class Topic:
    def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
        self.topic_id = topic_id
        self.topic = topic
        self.summary = summary
        self.message_ids = message_ids

class ChatMessage:
    def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
        self.message_id = message_id
        self.role = role
        self.content = content
        self.timestamp = timestamp or datetime.now()

2. 数据库管理

我们使用SQLite数据库进行数据持久化存储,通过DatabaseManager类统一管理数据库操作。

class DatabaseManager:
    def __init__(self, db_path: str = "chat_topics.db"):
        self.db_path = db_path
        self.init_db()
    
    def init_db(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建topics表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS topics (
                topic_id TEXT PRIMARY KEY,
                topic TEXT NOT NULL,
                summary TEXT,
                message_ids TEXT
            )
        """)
        
        # 创建chat_messages表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS chat_messages (
                message_id TEXT PRIMARY KEY,
                session_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        conn.commit()
        conn.close()

3. 核心功能工具

系统提供了三个核心工具函数,分别负责话题选择、历史记录获取和话题压缩。

def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
    """根据topic_ids选择话题的工具"""
    # 实现细节...

def get_history(session_id: str) -> List[Dict[str, Any]]:
    """获取当前会话的聊天记录"""
    # 实现细节...

def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """将聊天记录压缩成话题"""
    # 实现细节...

4. 多智能体系统

系统采用多智能体架构,每个智能体负责不同的任务:

  • 历史信息检索智能体:负责检索相关的历史话题
  • 问答智能体:基于上下文信息回答用户问题
  • 话题总结智能体:将对话记录压缩成结构化话题
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
    agent=history_retrieval_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

# 类似创建其他智能体...

5. 主协调函数

super_agent函数作为系统的主入口,协调各个智能体协作处理用户输入。

def super_agent(user_input: str, session_id: str):
    """超级智能体主函数"""
    if not session_id:
        session_id = str(uuid.uuid4())
    
    # 保存用户输入
    user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
    db_manager.save_chat_message(session_id, user_message)
    
    # 1. 历史信息检索智能体
    print("=== 历史信息检索智能体 ===")
    # ... 检索相关话题
    
    # 2. 使用检测信息回答用户问题智能体
    print("=== 问答智能体 ===")
    # ... 生成回答
    
    # 3. 处理历史信息 话题总结智能体
    print("=== 话题总结智能体 ===")
    # ... 压缩对话为话题
    
    return {
        "session_id": session_id,
        "qa_response": qa_response,
        "new_topics": new_topic
    }

系统特色与优势

1. 模块化设计

系统采用高度模块化的设计,各个组件职责分明,便于维护和扩展。

2. 智能话题识别

系统能够自动识别对话中的主题,并将相关对话内容聚类,大大提高了对话记录的可管理性。

3. 灵活的数据持久化

使用SQLite数据库,既保证了数据的持久化,又避免了复杂数据库的部署成本。

4. 多智能体协作

通过多个专门化的智能体分工合作,提高了系统的整体性能和准确性。

实际应用示例

下面是一个简单的使用示例:

if __name__ == "__main__":
    result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
    print(f"最终结果: {result}")

系统会自动处理用户输入,检索相关历史话题,生成回答,并将当前对话压缩成新话题。

扩展方向

这个基础系统有很多可能的扩展方向:

  1. 用户认证系统:添加用户登录和权限管理
  2. 话题可视化:提供图形化界面展示话题关系
  3. 高级摘要算法:使用更先进的NLP技术生成质量更高的摘要
  4. 多语言支持:扩展系统以支持多种语言
  5. 实时协作:支持多用户同时使用和协作

总结

本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构,系统能够高效地处理用户查询并管理对话历史。

这种系统可以广泛应用于客服系统、个人知识管理、团队协作等多种场景,帮助用户更好地组织和利用对话信息。希望本文能为你在构建智能对话系统方面提供有益的参考和启发。

提示:在实际部署时,建议添加错误处理、日志记录和性能监控等生产环境需要的功能,以确保系统的稳定性和可维护性。

import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union

from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool

# 为避免弃用警告,使用条件导入
try:
    from langchain_openai import ChatOpenAI
except ImportError:
    from langchain_community.chat_models import ChatOpenAI


# 配置连接到LM Studio的LLM
llm = ChatOpenAI(
    model="qwen/qwen3-1.7b",
    openai_api_key="lm-studio",
    openai_api_base="http://127.0.0.1:1234/v1",
    temperature=0.7
)


class Topic:
    def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
        self.topic_id = topic_id
        self.topic = topic
        self.summary = summary
        self.message_ids = message_ids


class ChatMessage:
    def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
        self.message_id = message_id
        self.role = role
        self.content = content
        self.timestamp = timestamp or datetime.now()


class DatabaseManager:
    def __init__(self, db_path: str = "chat_topics.db"):
        self.db_path = db_path
        self.init_db()

    def init_db(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建topics表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS topics (
                topic_id TEXT PRIMARY KEY,
                topic TEXT NOT NULL,
                summary TEXT,
                message_ids TEXT
            )
        """)
        
        # 创建chat_messages表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS chat_messages (
                message_id TEXT PRIMARY KEY,
                session_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        conn.commit()
        conn.close()

    def save_topic(self, topic: Topic):
        """保存话题"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
            (topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
        )
        conn.commit()
        conn.close()

    def get_all_topics(self) -> List[Topic]:
        """获取所有话题"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
        rows = cursor.fetchall()
        conn.close()
        
        topics = []
        for row in rows:
            topic_id, topic_str, summary, message_ids_str = row
            message_ids = json.loads(message_ids_str) if message_ids_str else []
            topics.append(Topic(topic_id, topic_str, summary, message_ids))
        return topics

    def save_chat_message(self, session_id: str, message: ChatMessage):
        """保存聊天消息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        # 使用INSERT OR IGNORE避免重复插入
        cursor.execute(
            "INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
            (message.message_id, session_id, message.role, message.content)
        )
        conn.commit()
        conn.close()

    def get_session_messages(self, session_id: str) -> List[ChatMessage]:
        """获取会话消息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
            (session_id,)
        )
        rows = cursor.fetchall()
        conn.close()
        
        messages = []
        for row in rows:
            message_id, role, content, timestamp = row
            # 处理时间戳格式
            if isinstance(timestamp, str):
                try:
                    timestamp = datetime.fromisoformat(timestamp)
                except ValueError:
                    timestamp = datetime.now()
            messages.append(ChatMessage(message_id, role, content, timestamp))
        return messages


# 创建数据库管理器实例
db_manager = DatabaseManager()


def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
    """根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
    topics = db_manager.get_all_topics()
    
    # 如果topic_ids为空,处理所有话题
    if not topic_ids:
        selected_topics = topics
        # 如果summary_only是布尔值,则应用于所有话题
        if isinstance(summary_only, bool):
            summary_flags = [summary_only] * len(selected_topics)
        # 如果summary_only是列表,则按顺序应用
        elif isinstance(summary_only, list):
            summary_flags = summary_only
        else:
            summary_flags = [False] * len(selected_topics)
    else:
        # 根据指定的topic_ids筛选话题
        selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
        # 如果summary_only是布尔值,则应用于所有选定话题
        if isinstance(summary_only, bool):
            summary_flags = [summary_only] * len(selected_topics)
        # 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
        elif isinstance(summary_only, list):
            summary_flags = summary_only[:len(selected_topics)]
            # 如果summary_only列表长度不足,用False填充
            summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
        else:
            summary_flags = [False] * len(selected_topics)
    
    result = []
    for i, topic in enumerate(selected_topics):
        # 获取对应的summary_only标志
        show_summary_only = summary_flags[i] if i < len(summary_flags) else False
        
        if show_summary_only:
            result.append({
                "topic_id": topic.topic_id,
                "topic": topic.topic,
                "summary": topic.summary
            })
        else:
            result.append({
                "topic_id": topic.topic_id,
                "topic": topic.topic,
                "summary": topic.summary,
                "message_ids": topic.message_ids
            })
    return result


def get_history(session_id: str) -> List[Dict[str, Any]]:
    """获取当前会话的聊天记录"""
    messages = db_manager.get_session_messages(session_id)
    return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]


def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """将新的聊天记录压缩成话题"""
    # 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
    if not new_messages:
        return []
    
    # 创建新话题
    topic_id = str(uuid.uuid4())
    topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
    
    message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
    
    # 保存消息到数据库
    for msg in new_messages:
        if 'message_id' not in msg:
            msg['message_id'] = str(uuid.uuid4())
        chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
        db_manager.save_chat_message(session_id, chat_msg)
    
    # 创建话题对象
    topic = Topic(
        topic_id=topic_id,
        topic=topic_content,
        summary=f"包含{len(new_messages)}条消息的话题",
        message_ids=message_ids
    )
    
    # 保存话题到数据库
    db_manager.save_topic(topic)
    
    # 返回话题信息
    return [{
        "topic_id": topic.topic_id,
        "topic": topic.topic,
        "summary": topic.summary,
        "message_ids": topic.message_ids
    }]


# 定义工具列表
tools = [
    Tool.from_function(
        func=get_history,
        name="get_history",
        description="获取指定会话的聊天历史记录"
    ),
    Tool.from_function(
        func=select_topic_tool,
        name="select_topic_tool",
        description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
    ),
    Tool.from_function(
        func=compress_history_to_topic,
        name="compress_history_to_topic",
        description="将聊天记录压缩成话题"
    )
]

# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}"""

prompt = PromptTemplate.from_template(template)

# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)

# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
    agent=history_retrieval_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

qa_agent = AgentExecutor(
    agent=qa_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

topic_summary_agent = AgentExecutor(
    agent=topic_summary_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)


def super_agent(user_input: str, session_id: str):
    """超级智能体主函数"""
    if not session_id:
        session_id = str(uuid.uuid4())
    
    # 保存用户输入
    user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
    db_manager.save_chat_message(session_id, user_message)
    
    # 1. 历史信息检索智能体
    print("=== 历史信息检索智能体 ===")
    # 检索相关话题
    all_topics = db_manager.get_all_topics()
    topic_ids = [topic.topic_id for topic in all_topics]
    
    if topic_ids:
        # 为每个topic_id指定不同的summary_only值
        summary_flags = [True, False] * (len(topic_ids) // 2 + 1)
        summary_flags = summary_flags[:len(topic_ids)]
        related_topics = select_topic_tool(topic_ids, summary_flags)
        print(f"相关话题: {related_topics}")
    else:
        print("未找到相关话题")
        related_topics = []
    
    # 2. 使用检测信息回答用户问题智能体
    print("=== 问答智能体 ===")
    history = get_history(session_id)
    print(f"会话历史: {history}")
    
    # 构造问答上下文
    qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
    try:
        qa_response = qa_agent.invoke({
            "input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}",
            "tools": tools,
            "tool_names": ", ".join([t.name for t in tools])
        })
        print(f"问答结果: {qa_response}")
    except Exception as e:
        print(f"问答智能体执行出错: {e}")
        qa_response = {"output": "抱歉,我无法生成回答。"}
    
    # 3. 处理历史信息 话题总结智能体
    print("=== 话题总结智能体 ===")
    # 将当前会话消息压缩成话题
    session_messages = get_history(session_id)
    if session_messages:
        new_topic = compress_history_to_topic(session_id, session_messages)
        print(f"新话题: {new_topic}")
    else:
        print("没有新消息需要压缩成话题")
        new_topic = []
    
    return {
        "session_id": session_id,
        "qa_response": qa_response.get("output", qa_response.get("result", "无回答")),
        "new_topics": new_topic
    }


# 示例使用
if __name__ == "__main__":
    result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
    print(f"最终结果: {result}")

并行

import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union
from concurrent.futures import ThreadPoolExecutor, as_completed

from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool

# 为避免弃用警告,使用条件导入
try:
    from langchain_openai import ChatOpenAI
except ImportError:
    from langchain_community.chat_models import ChatOpenAI


# 配置连接到LM Studio的LLM
llm = ChatOpenAI(
    model="qwen/qwen3-1.7b",
    openai_api_key="lm-studio",
    openai_api_base="http://127.0.0.1:1234/v1",
    temperature=0.7
)


class Topic:
    def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
        self.topic_id = topic_id
        self.topic = topic
        self.summary = summary
        self.message_ids = message_ids


class ChatMessage:
    def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
        self.message_id = message_id
        self.role = role
        self.content = content
        self.timestamp = timestamp or datetime.now()


class DatabaseManager:
    def __init__(self, db_path: str = "chat_topics.db"):
        self.db_path = db_path
        self.init_db()

    def init_db(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建topics表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS topics (
                topic_id TEXT PRIMARY KEY,
                topic TEXT NOT NULL,
                summary TEXT,
                message_ids TEXT
            )
        """)
        
        # 创建chat_messages表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS chat_messages (
                message_id TEXT PRIMARY KEY,
                session_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        conn.commit()
        conn.close()

    def save_topic(self, topic: Topic):
        """保存话题"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
            (topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
        )
        conn.commit()
        conn.close()

    def get_all_topics(self) -> List[Topic]:
        """获取所有话题"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
        rows = cursor.fetchall()
        conn.close()
        
        topics = []
        for row in rows:
            topic_id, topic_str, summary, message_ids_str = row
            message_ids = json.loads(message_ids_str) if message_ids_str else []
            topics.append(Topic(topic_id, topic_str, summary, message_ids))
        return topics

    def save_chat_message(self, session_id: str, message: ChatMessage):
        """保存聊天消息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        # 使用INSERT OR IGNORE避免重复插入
        cursor.execute(
            "INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
            (message.message_id, session_id, message.role, message.content)
        )
        conn.commit()
        conn.close()

    def get_session_messages(self, session_id: str) -> List[ChatMessage]:
        """获取会话消息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
            (session_id,)
        )
        rows = cursor.fetchall()
        conn.close()
        
        messages = []
        for row in rows:
            message_id, role, content, timestamp = row
            # 处理时间戳格式
            if isinstance(timestamp, str):
                try:
                    timestamp = datetime.fromisoformat(timestamp)
                except ValueError:
                    timestamp = datetime.now()
            messages.append(ChatMessage(message_id, role, content, timestamp))
        return messages


# 创建数据库管理器实例
db_manager = DatabaseManager()


def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
    """根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
    try:
        topics = db_manager.get_all_topics()
        
        # 如果topic_ids为空,处理所有话题
        if not topic_ids:
            selected_topics = topics
            # 如果summary_only是布尔值,则应用于所有话题
            if isinstance(summary_only, bool):
                summary_flags = [summary_only] * len(selected_topics)
            # 如果summary_only是列表,则按顺序应用
            elif isinstance(summary_only, list):
                summary_flags = summary_only
            else:
                summary_flags = [False] * len(selected_topics)
        else:
            # 根据指定的topic_ids筛选话题
            selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
            # 如果summary_only是布尔值,则应用于所有选定话题
            if isinstance(summary_only, bool):
                summary_flags = [summary_only] * len(selected_topics)
            # 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
            elif isinstance(summary_only, list):
                summary_flags = summary_only[:len(selected_topics)]
                # 如果summary_only列表长度不足,用False填充
                summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
            else:
                summary_flags = [False] * len(selected_topics)
        
        result = []
        for i, topic in enumerate(selected_topics):
            # 获取对应的summary_only标志
            show_summary_only = summary_flags[i] if i < len(summary_flags) else False
            
            if show_summary_only:
                result.append({
                    "topic_id": topic.topic_id,
                    "topic": topic.topic,
                    "summary": topic.summary
                })
            else:
                result.append({
                    "topic_id": topic.topic_id,
                    "topic": topic.topic,
                    "summary": topic.summary,
                    "message_ids": topic.message_ids
                })
        return result
    except Exception as e:
        print(f"select_topic_tool执行出错: {e}")
        return []


def get_history(session_id: str) -> List[Dict[str, Any]]:
    """获取当前会话的聊天记录"""
    try:
        messages = db_manager.get_session_messages(session_id)
        return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]
    except Exception as e:
        print(f"get_history执行出错: {e}")
        return []


def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
    """将新的聊天记录压缩成话题,并合并相似话题"""
    try:
        # 如果没有提供new_messages,则从数据库获取会话消息
        if new_messages is None:
            new_messages = get_history(session_id)
        
        # 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
        if not new_messages:
            return []
        
        # 创建新话题
        topic_id = str(uuid.uuid4())
        topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
        
        message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
        
        # 保存消息到数据库
        for msg in new_messages:
            if 'message_id' not in msg:
                msg['message_id'] = str(uuid.uuid4())
            chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
            db_manager.save_chat_message(session_id, chat_msg)
        
        # 创建话题对象
        new_topic = Topic(
            topic_id=topic_id,
            topic=topic_content,
            summary=f"包含{len(new_messages)}条消息的话题",
            message_ids=message_ids
        )
        
        # 检查是否已存在相似话题
        all_topics = db_manager.get_all_topics()
        merged = False
        for existing_topic in all_topics:
            # 简单的相似性检查:比较话题标题的前几个字符
            if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:
                # 合并到现有话题
                existing_topic.message_ids.extend(message_ids)
                existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"
                db_manager.save_topic(existing_topic)
                merged = True
                break
        
        # 如果没有合并,则保存新话题
        if not merged:
            db_manager.save_topic(new_topic)
        
        # 返回话题信息
        return [{
            "topic_id": new_topic.topic_id,
            "topic": new_topic.topic,
            "summary": new_topic.summary,
            "message_ids": new_topic.message_ids
        }]
    except Exception as e:
        print(f"compress_history_to_topic执行出错: {e}")
        return []


# 定义工具列表
tools = [
    Tool.from_function(
        func=get_history,
        name="get_history",
        description="获取指定会话的聊天历史记录"
    ),
    Tool.from_function(
        func=select_topic_tool,
        name="select_topic_tool",
        description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
    ),
    Tool.from_function(
        func=lambda session_id: compress_history_to_topic(session_id),
        name="compress_history_to_topic",
        description="将聊天记录压缩成话题"
    )
]

# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought: {agent_scratchpad}"""

prompt = PromptTemplate.from_template(template)

# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)

# 创建智能体执行器
history_retrieval_agent = AgentExecutor(
    agent=history_retrieval_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

qa_agent = AgentExecutor(
    agent=qa_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

topic_summary_agent = AgentExecutor(
    agent=topic_summary_agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)


def super_agent(user_input: str, session_id: str):
    """超级智能体主函数"""
    if not session_id:
        session_id = str(uuid.uuid4())
    
    # 保存用户输入
    user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
    db_manager.save_chat_message(session_id, user_message)
    
    # 1. 历史信息检索智能体(并行搜索)
    print("=== 历史信息检索智能体(并行搜索) ===")
    # 检索相关话题
    all_topics = db_manager.get_all_topics()
    topic_ids = [topic.topic_id for topic in all_topics]
    
    related_topics = []
    if topic_ids:
        # 将话题ID分组,每组10个
        topic_groups = [topic_ids[i:i+10] for i in range(0, len(topic_ids), 10)]
        
        # 并行执行多个搜索任务,每组话题作为一个任务
        with ThreadPoolExecutor(max_workers=min(5, len(topic_groups))) as executor:
            # 提交多个任务
            future_to_topic = {}
            
            # 为每组话题创建一个任务
            for i, group in enumerate(topic_groups):
                # 交替使用摘要和完整信息模式
                summary_mode = (i % 2 == 0)  # 偶数索引组使用摘要模式
                future = executor.submit(select_topic_tool, group, summary_mode)
                future_to_topic[future] = f"话题组{i}搜索({'摘要' if summary_mode else '完整'})"
            
            # 任务: 获取会话历史
            future3 = executor.submit(get_history, session_id)
            future_to_topic[future3] = "会话历史"
            
            # 收集并行任务结果
            search_results = {}
            for future in as_completed(future_to_topic):
                task_name = future_to_topic[future]
                try:
                    result = future.result()
                    search_results[task_name] = result
                    print(f"{task_name} 完成,结果数: {len(result) if isinstance(result, list) else 'N/A'}")
                except Exception as exc:
                    print(f'{task_name} 执行时发生异常: {exc}')
                    search_results[task_name] = []
            
            # 合并所有话题搜索结果
            for task_name, result in search_results.items():
                if "话题组" in task_name and isinstance(result, list):
                    related_topics.extend(result)
            
            print(f"合并后相关话题数: {len(related_topics)}")
    else:
        print("未找到相关话题")
        related_topics = []
    
    # 2. 使用检测信息回答用户问题智能体
    print("=== 问答智能体 ===")
    history = get_history(session_id)
    print(f"会话历史: {history}")
    
    # 构造问答上下文
    qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
    try:
        qa_response = qa_agent.invoke({
            "input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"
        })
        print(f"问答结果: {qa_response}")
    except Exception as e:
        print(f"问答智能体执行出错: {e}")
        qa_response = {"output": "抱歉,我无法生成回答。"}
    
    # 3. 处理历史信息 话题总结智能体
    print("=== 话题总结智能体 ===")
    # 将当前会话消息压缩成话题
    new_topic = compress_history_to_topic(session_id)
    if new_topic:
        print(f"新话题: {new_topic}")
    else:
        print("没有新消息需要压缩成话题")
    
    # 确保qa_response不为None并且有合适的键
    if qa_response is None:
        qa_response = {"output": "抱歉,我无法生成回答。"}
    
    # 处理可能缺少output或result键的情况
    if "output" in qa_response:
        qa_result = qa_response["output"]
    elif "result" in qa_response:
        qa_result = qa_response["result"]
    else:
        qa_result = "无回答"
    
    return {
        "session_id": session_id,
        "qa_response": qa_result,
        "new_topics": new_topic
    }


# 示例使用
if __name__ == "__main__":
    result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
    print(f"最终结果: {result}")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐