希望智能体在开始“工作”之前进行多轮对话。有几种方法可以实现这一点。让我为您展示最佳模式:

方法 1:具有会话状态管理的单智能体
这是最简单、最优雅的方法——使用一个在内部管理会话状态的智能体:

from strands import Agent, tool
from strands.multiagent.a2a import A2AServer
from strands.session.s3_session_manager import S3SessionManager
import json
from typing import Dict, Any

class ConversationalWorkflowAgent:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        
        # 用于持久化会话记忆的会话管理器
        self.session_manager = S3SessionManager(
            session_id=f"conversational_{agent_id}",
            bucket="agent-conversation-memory", # 智能体会话记忆存储桶
            prefix=f"conversations/{agent_id}" # 前缀
        )
        
        # 会话状态
        self.conversation_state = {
            "phase": "gathering_info",  # 阶段: gathering_info -> ready_to_work -> working
            "collected_info": {}, # 收集的信息
            "conversation_count": 0, # 会话次数
            "requirements_met": False # 需求是否满足
        }
        
        # 创建具有会话管理工具的智能体
        self.agent = Agent(
            name=f"Conversational {agent_id}", # 智能体名称
            session_manager=self.session_manager,
            prompt=self.get_conversation_prompt(), # 获取提示词
            tools=[
                self.update_conversation_state, # 更新会话状态
                self.check_readiness_to_work, # 检查是否准备好工作
                self.start_working, # 开始工作
                *self.get_work_tools() # 获取工作工具
            ]
        )
    
    def get_conversation_prompt(self):
        return """
        您是一个对话式智能体,需要在开始工作前通过对话收集信息。
        
        会话阶段 (CONVERSATION PHASES):
        1. GATHERING_INFO (收集信息): 提问以了解用户需求 (最少 3 次交流)
        2. READY_TO_WORK (准备就绪): 确认您已拥有足够信息
        3. WORKING (工作中): 执行实际任务
        
        规则 (RULES):
        - 在回应前始终检查会话状态
        - 在提供开始工作之前至少收集 3 条信息
        - 在信息收集期间保持对话性和吸引力
        - 只有在用户明确确认准备好后才开始工作
        
        使用工具来管理会话状态和检查准备情况。
        """
    
    @tool
    def update_conversation_state(self, key: str, value: str, phase: str = None) -> str:
        """使用新信息更新会话状态"""
        self.conversation_state["collected_info"][key] = value # 存储信息
        self.conversation_state["conversation_count"] += 1 # 增加计数
        
        if phase:
            self.conversation_state["phase"] = phase # 更新阶段
            
        # 检查是否有足够的信息继续
        info_count = len(self.conversation_state["collected_info"]) # 信息数量
        min_exchanges = 3 # 最小交流次数
        
        if (info_count >= min_exchanges and 
            self.conversation_state["conversation_count"] >= min_exchanges and
            self.conversation_state["phase"] == "gathering_info"):
            self.conversation_state["phase"] = "ready_to_work" # 转换到准备就绪阶段
            return f"状态已更新。收集了 {info_count} 条信息。准备转换到工作阶段。"
        
        return f"状态已更新。收集了 {info_count} 条信息。还需要 {min_exchanges - info_count} 条。"
    
    @tool
    def check_readiness_to_work(self) -> str:
        """检查智能体是否准备好开始工作"""
        state = self.conversation_state
        
        info = {
            "phase": state["phase"], # 当前阶段
            "conversation_count": state["conversation_count"], # 会话次数
            "collected_info_count": len(state["collected_info"]), # 收集的信息数量
            "collected_info": state["collected_info"], # 收集的信息
            "ready_to_work": state["phase"] == "ready_to_work" # 是否准备好工作
        }
        
        return json.dumps(info, indent=2) # 返回 JSON 格式信息
    
    @tool
    def start_working(self, user_confirmation: str = "yes") -> str:
        """开始实际的工作阶段"""
        if user_confirmation.lower() in ["yes", "y", "start", "begin", "proceed"]: # 确认词
            self.conversation_state["phase"] = "working" # 进入工作阶段
            return "开始工作阶段。我现在将使用收集到的信息执行您的请求。"
        else:
            return "工作阶段未开始。请告诉我何时准备好继续。"
    
    def get_work_tools(self):
        """在子类中重写以提供特定的工作工具"""
        @tool
        def perform_analysis(task_description: str) -> str:
            """执行实际的工作任务"""
            if self.conversation_state["phase"] != "working": # 检查阶段
                return "尚无法执行分析。仍在对话阶段。"
            
            collected_info = self.conversation_state["collected_info"] # 获取收集的信息
            return f"基于收集的信息执行分析: {collected_info}" # 返回结果
        
        return [perform_analysis]

# 使用示例
class DataAnalysisAgent(ConversationalWorkflowAgent): # 数据分析智能体
    def get_work_tools(self):
        @tool
        def analyze_data(dataset_info: str) -> str:
            """根据会话需求分析数据"""
            if self.conversation_state["phase"] != "working":
                return "尚无法分析数据。请先完成对话。"
            
            requirements = self.conversation_state["collected_info"] # 需求信息
            return f"""
            数据分析结果:
            - 数据集: {requirements.get('dataset', '未指定')}
            - 分析类型: {requirements.get('analysis_type', '未指定')}
            - 重点领域: {requirements.get('focus_areas', '未指定')}
            - 时间线: {requirements.get('timeline', '未指定')}
            
            [此处将执行详细分析]
            """
        
        @tool
        def generate_report(report_format: str = "summary") -> str:
            """生成分析报告"""
            if self.conversation_state["phase"] != "working":
                return "尚无法生成报告。分析未开始。"
            
            return f"基于分析需求生成了 {report_format} 报告。"
        
        return [analyze_data, generate_report]

# 部署对话式智能体
if __name__ == "__main__":
    agent = DataAnalysisAgent("data_analyst") # 创建实例
    server = A2AServer(agent=agent.agent, port=9000) # 创建服务器
    server.serve() # 启动服务

方法 2:双智能体交接模式
使用一个对话智能体,其将任务交接给一个工作智能体:

from strands import Agent, tool
from strands.multiagent.handoffs import handoff_to_agent # 导入交接函数
import asyncio

class ConversationManager: # 会话管理器
    def __init__(self):
        # 用于对话的会话智能体
        self.conversation_agent = Agent(
            name="Conversation Manager", # 名称
            prompt="""
            您是一名会话管理器。您的工作是:
            1. 与用户进行对话以了解他们的需求
            2. 收集至少 3 条重要信息
            3. 准备就绪时,交接给工作智能体
            
            跟踪会话进度,只有在拥有足够信息时才进行交接。
            """,
            tools=[
                self.collect_information, # 收集信息
                self.check_conversation_progress, # 检查会话进度
                self.handoff_to_worker # 交接给工作器
            ]
        )
        
        # 用于实际任务的工作智能体
        self.work_agent = Agent(
            name="Work Executor", # 工作执行器
            prompt="""
            您是一名工作执行器。您接收来自会话管理器的详细需求
            并执行实际任务。您可以访问所有收集到的信息。
            """,
            tools=[
                self.execute_task, # 执行任务
                self.generate_deliverable # 生成交付物
            ]
        )
        
        # 智能体间的共享内存
        self.shared_context = {
            "conversation_history": [], # 会话历史
            "collected_requirements": {}, # 收集的需求
            "conversation_count": 0, # 会话次数
            "ready_for_handoff": False # 是否准备好交接
        }
    
    @tool
    def collect_information(self, key: str, value: str) -> str:
        """在会话期间收集信息"""
        self.shared_context["collected_requirements"][key] = value # 存储信息
        self.shared_context["conversation_count"] += 1 # 增加计数
        
        # 检查是否准备好交接
        if (len(self.shared_context["collected_requirements"]) >= 3 and 
            self.shared_context["conversation_count"] >= 3):
            self.shared_context["ready_for_handoff"] = True # 标记为准备好
            return f"已收集: {key} = {value}。准备交接给工作智能体。"
        
        return f"已收集: {key} = {value}。交接前需要更多信息。"
    
    @tool
    def check_conversation_progress(self) -> str:
        """检查当前会话进度"""
        context = self.shared_context
        return f"""
        会话进度:
        - 交流次数: {context['conversation_count']}
        - 已收集需求: {len(context['collected_requirements'])}
        - 准备好交接: {context['ready_for_handoff']}
        - 需求: {context['collected_requirements']}
        """
    
    @tool
    def handoff_to_worker(self, user_message: str) -> str:
        """将收集的上下文交接给工作智能体"""
        if not self.shared_context["ready_for_handoff"]: # 检查是否准备好
            return "尚未准备好交接。需要更多对话。"
        
        # 为工作智能体准备上下文
        context_message = f"""
        来自会话管理器的交接:
        用户请求: {user_message}
        收集的需求: {self.shared_context['collected_requirements']}
        会话历史: {self.shared_context['conversation_history']}
        
        请基于此信息执行工作。
        """
        
        # 交接给工作智能体
        return handoff_to_agent(self.work_agent, context_message) # 调用交接函数
    
    @tool
    def execute_task(self, task_details: str) -> str:
        """执行实际的工作任务"""
        requirements = self.shared_context["collected_requirements"] # 获取需求
        return f"""
        任务执行结果:
        基于需求: {requirements}
        
        [实际工作将在此处执行]
        任务成功完成。
        """
    
    @tool
    def generate_deliverable(self, format_type: str = "report") -> str:
        """生成最终交付物"""
        return f"基于收集的需求和执行的任务生成了 {format_type}。"

# 使用 A2A 部署
conversation_manager = ConversationManager()
server = A2AServer(agent=conversation_manager.conversation_agent, port=9000)
server.serve()














推荐
对于您的用例,我推荐方法 1(具有状态管理的单智能体),因为:

  • 简单性 (Simplicity): 一个智能体处理所有事情

  • 会话持久性 (Session Persistence): 跨会话的自动记忆

  • 清晰的状态跟踪 (Clear State Tracking): 明确的会话阶段

  • A2A 兼容 (A2A Compatible): 与 ClientFactory 完美配合

  • 可扩展 (Scalable): 易于扩展更复杂的工作流

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐