【CrewAI 源码剖析·第二篇】kickoff 执行链路全解析:从入口函数到 LLM 工具调用,每一行代码都追踪到底

作者:技术博主 | 更新时间:2026-05-11 | 阅读时长:约 24 分钟
系列:CrewAI 源码剖析(共 4 篇)
版本:CrewAI v1.14.x(2026-05 最新稳定版)
标签CrewAI 源码分析 kickoff AgentExecutor ReAct 工具调用 执行链路 Python


在这里插入图片描述

🔥 本篇目标crew.kickoff() 一行代码背后发生了什么?本篇从入口函数开始,追踪完整的调用栈:任务调度 → Agent 执行 → LLM 推理 → 工具调用 → 输出解析。理解这条链路,才能真正知道 CrewAI 的 Bug 出在哪、性能瓶颈在哪、以及如何做定制化扩展。


系列进度

篇次 主题 状态
第一篇 整体架构与四大原语源码解析 ✅ 已发布
第二篇(本篇) kickoff 执行链路:从入口到工具调用全链路
第三篇 记忆与知识系统:短时/长时/实体记忆 + RAG 即将发布
第四篇 Flow 事件驱动系统:装饰器状态机原理 即将发布

目录


一、执行链路总览:七层调用栈

先看整体,再逐层深入。crew.kickoff() 到最终输出,经历七个层次:

Layer 1  crew.kickoff(inputs)
              ↓ 变量插值、回调、规划
Layer 2  crew._run_process()
              ↓ 根据 Process.sequential / hierarchical 分发
Layer 3  Sequential: _execute_tasks() / Hierarchical: _execute_hierarchical()
              ↓ 遍历 tasks,逐个调度
Layer 4  agent.execute_task(task, context)
              ↓ 构建 Prompt,委托 Executor
Layer 5  CrewAgentExecutor.invoke()
              ↓ 进入 ReAct 循环 _invoke_loop()
Layer 6  get_llm_response(messages)
              ↓ 调用 LLM(LiteLLM 路由到具体模型)
              ↓ 解析响应 → AgentAction 或 AgentFinish
Layer 7  execute_tool_and_check_finality(action)
              ↓ 查找并执行对应工具
              ↓ 观察结果追加到 messages,回到 Layer 5
              ↓ 或返回最终答案 AgentFinish
              ↓
         TaskOutput(raw / json / pydantic)

关键文件路径(对照源码阅读时用):

src/crewai/
  crew.py                              # Layer 1-2
  crews/crew_output.py                 # CrewOutput 模型
  agents/crew_agent_executor.py        # Layer 5-6(核心 ReAct 循环)
  agents/agent_builder/               # Agent 构建器
  utilities/agent_utils.py            # Layer 6 LLM 调用工具函数
  utilities/tool_utils.py             # Layer 7 工具执行
  tasks/task_output.py                # 输出模型
  agents/parser.py                    # 输出解析器

二、第一层:Crew.kickoff() 入口函数

# src/crewai/crew.py(精简,聚焦关键逻辑)

def kickoff(
    self,
    inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
    """
    Crew 执行的主入口。
    调用前:Crew 已初始化(Agent、Task、记忆系统均已装配)
    调用后:返回 CrewOutput,包含最后一个 Task 的输出和所有 Task 的输出列表
    """
    # ── 步骤1:执行 before_kickoff 回调 ──────────────────────
    # @before_kickoff 装饰器注册的函数在这里被调用
    # 可以用来修改 inputs(比如补充默认值、格式化输入)
    for callback in self.before_kickoff_callbacks:
        inputs = callback(inputs) or inputs

    # ── 步骤2:变量插值 ──────────────────────────────────────
    # 把 inputs 里的值替换到 Agent/Task 的 {variable} 占位符
    self._interpolate_inputs(inputs or {})

    # ── 步骤3:规划模式(可选)───────────────────────────────
    # 如果 Crew(planning=True),先让一个 Planning Agent 分析任务
    # 生成每个 Task 的执行步骤,插入到 Task 的描述中
    if self.planning:
        self._handle_crew_planning()

    # ── 步骤4:遥测:记录本次执行的开始事件 ─────────────────
    self._execution_span = self._telemetry.crew_execution_span(self, inputs)

    # ── 步骤5:核心执行 ──────────────────────────────────────
    try:
        result = self._run_process()
    except Exception as e:
        # 捕获执行中的异常,发出错误事件,然后重新抛出
        self._telemetry.crew_execution_error(self._execution_span, e)
        raise

    # ── 步骤6:执行 after_kickoff 回调 ───────────────────────
    for callback in self.after_kickoff_callbacks:
        callback(result)

    # ── 步骤7:记录遥测结束事件 ──────────────────────────────
    self._telemetry.end_crew_execution_span(self._execution_span, result)

    return result

2.1 _interpolate_inputs():变量插值的实现

def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
    """
    把 inputs 字典的值替换到所有 Agent 和 Task 的文本字段中

    设计细节:
    - 用 Python 的 str.format_map() 而不是 format()
    - format_map() 遇到不存在的 key 不会报错(用 SafeDict 兜底)
    - 原始模板保存在 _original_description,支持重复插值(kickoff_for_each)
    """
    # 对每个 Agent 的三个文本字段做插值
    for agent in self.agents:
        agent.interpolate_inputs(inputs)

    # 对每个 Task 的 description 做插值
    for task in self.tasks:
        task.interpolate_inputs(inputs)

class SafeDict(dict):
    """
    缺失 key 时返回 '{key}' 原字符串,不抛出 KeyError
    这样 description 中存在但 inputs 里没提供的变量,保持原样
    """
    def __missing__(self, key):
        return "{" + key + "}"

2.2 _handle_crew_planning():规划模式内部

def _handle_crew_planning(self) -> None:
    """
    Planning 模式:在正式执行前,让一个 Planning Agent 生成执行计划
    计划会以文本形式插入到每个 Task 的 description 前面
    """
    from crewai.utilities.crew_planner import CrewPlanner

    # 构建规划 Prompt(把所有 Task 的描述和 Agent 的能力告诉规划者)
    planner = CrewPlanner(
        tasks=self.tasks,
        planning_agent_llm=self.planning_llm or self.agents[0].llm,
    )
    # 规划 Agent 输出一个 JSON,每个 Task 对应一个执行步骤列表
    plan = planner.create_plan()

    # 把计划注入到对应 Task 的 description 头部
    for task, task_plan in zip(self.tasks, plan.list_of_plans_per_task):
        task.description = (
            f"Planning Step:\n{task_plan}\n\n"
            f"Original Task Description:\n{task.description}"
        )

三、第二层:_run_process() → 策略分发

def _run_process(self) -> CrewOutput:
    """
    根据 self.process 选择执行策略
    这里是策略模式(Strategy Pattern)的标准实现
    """
    if self.process == Process.sequential:
        return self._execute_tasks(self.tasks)

    elif self.process == Process.hierarchical:
        return self._execute_hierarchical_process()

    else:
        raise NotImplementedError(
            f"Process type '{self.process}' is not implemented."
        )

3.1 Hierarchical 模式的 Manager Agent

def _execute_hierarchical_process(self) -> CrewOutput:
    """
    层级模式执行:由 Manager Agent 统一调度所有 Worker Agent
    Manager Agent 不直接执行任务,而是负责:
      1. 把大任务分解成子任务
      2. 把子任务委派给合适的 Worker
      3. 审核 Worker 的输出
      4. 决定是否需要重新执行
    """
    # 如果用户没有指定 manager_agent,自动创建一个
    if not self.manager_agent:
        self.manager_agent = Agent(
            role="Crew Manager",
            goal="Manage the crew to complete the task in the best possible way",
            backstory="You are a seasoned manager with a knack for getting the best "
                      "out of your team. You are also known for your ability to "
                      "delegate work to the right person.",
            llm=self.manager_llm,
            # Manager Agent 自动获得"委派工具"(把子任务分配给 Worker)
            allow_delegation=True,
            verbose=self.verbose,
        )
        # 把所有 Worker Agent 注入到 Manager 的委派工具里
        self.manager_agent.crew = self
        self.manager_agent.set_tools_handler()

    # Manager 执行的是一个"综合任务":协调完成所有子任务
    manager_task = Task(
        description=self._build_manager_task_description(),
        expected_output="A comprehensive final answer that completes the crew's tasks",
        agent=self.manager_agent,
    )

    # Manager 执行完成后,从其输出中提取最终结果
    result = self.manager_agent.execute_task(
        task=manager_task,
        context=self._get_context(manager_task, []),
    )
    return self._build_crew_output(result, [manager_task])

四、第三层:Sequential 顺序执行引擎

Sequential 模式是最常用的,也是最值得细看的。

def _execute_tasks(
    self,
    tasks: List[Task],
    start_index: int = 0,
    was_replayed: bool = False,
) -> CrewOutput:
    """
    顺序执行所有 Task 的核心循环

    参数:
        start_index:从第几个 Task 开始(用于 replay 功能)
        was_replayed:是否是从断点恢复
    """
    task_outputs: List[TaskOutput] = []

    # 从上次中断的位置开始(或从头开始)
    for task_index, task in enumerate(tasks[start_index:], start=start_index):

        # ── 处理异步 Task ─────────────────────────────────────
        if task.async_execution:
            # 异步 Task:提交到线程池,不等待结果,继续下一个 Task
            context = self._get_context(task, task_outputs)
            future = self._task_futures[task] = self.thread_pool.submit(
                self._execute_single_task,
                task=task,
                context=context,
            )
            task_outputs.append(None)  # 占位,后面会填充
            continue

        # ── 等待所有已提交的异步 Task 完成 ─────────────────────
        # 当遇到非异步 Task 时,需要先等前面的异步 Task 都完成
        # 因为非异步 Task 可能依赖前面异步 Task 的输出
        self._wait_async_tasks(task_outputs)

        # ── 执行同步 Task ─────────────────────────────────────
        # 上下文 = 所有已完成 Task 的输出(或由 task.context 指定的特定 Task)
        context = self._get_context(task, task_outputs)
        task_output = self._execute_single_task(task=task, context=context)
        task_outputs[task_index] = task_output    # 替换占位符

        # ── 触发 task_callback ────────────────────────────────
        if self.task_callback:
            self.task_callback(task_output)

    # 等待所有异步 Task 最终完成(如果最后几个是异步的)
    self._wait_async_tasks(task_outputs)

    # 构建最终输出(取最后一个 Task 的输出作为 Crew 的输出)
    return self._build_crew_output(
        final_output=task_outputs[-1],
        tasks_output=task_outputs,
    )

4.1 _execute_single_task():单个 Task 的完整执行

def _execute_single_task(
    self,
    task: Task,
    context: str,
) -> TaskOutput:
    """
    执行单个 Task 的完整流程,包括 Guardrail 重试
    """
    # ── 发出 Task 开始事件(遥测/日志系统监听)─────────────────
    self._logger.info(
        f"\n\n\033[1m\033[95m [{'Task' if not task.async_execution else 'Async Task'}]\033[00m"
        f"\033[1m\033[92m Starting Task: {task.description}\033[00m\n",
    )

    # ── 确定执行 Agent ────────────────────────────────────────
    # task.agent 在 sequential 模式下必须显式指定
    agent = task.agent
    if not agent:
        raise ValueError(f"Task '{task.description}' has no agent assigned.")

    # ── 如果需要 human_input,在执行前先让用户输入 ─────────────
    if task.human_input:
        human_feedback = self._get_human_input(task)
        context = f"{context}\n\nHuman Feedback: {human_feedback}" if context else human_feedback

    # ── 核心执行:委托给 Agent ──────────────────────────────────
    task_output = agent.execute_task(
        task=task,
        context=context,
        tools=task.tools or agent.tools,
    )

    # ── 处理输出到文件 ────────────────────────────────────────
    if task.output_file:
        self._write_output_file(task.output_file, task_output.raw)

    # ── Guardrail 质量检查 ────────────────────────────────────
    if task.guardrail:
        task_output = self._handle_guardrail(task, task_output, agent)

    # ── 保存输出到 task.output(供后续 Task 引用)──────────────
    task.output = task_output

    return task_output

4.2 _get_context():上下文聚合逻辑

这是 Task 间数据流转的核心:

def _get_context(
    self,
    task: Task,
    task_outputs: List[TaskOutput],
) -> str:
    """
    为当前 Task 构建上下文字符串
    上下文来源有两种:
      1. task.context 显式指定的依赖 Task 列表
      2. 默认:当前 Task 之前所有已完成 Task 的输出
    """
    if task.context:
        # 模式1:显式上下文依赖
        # task.context = [task_a, task_b]
        # 只使用指定 Task 的输出作为上下文
        context_parts = []
        for dep_task in task.context:
            if dep_task.output:
                context_parts.append(
                    f"Output from task '{dep_task.description[:50]}...':\n"
                    f"{dep_task.output.raw}"
                )
        return "\n\n".join(context_parts)
    else:
        # 模式2:隐式上下文(所有前序 Task 的输出)
        # 这是 sequential 模式的默认行为
        completed_outputs = [o for o in task_outputs if o is not None]
        if not completed_outputs:
            return ""
        context_parts = []
        for i, output in enumerate(completed_outputs):
            context_parts.append(
                f"Task {i+1} output:\n{output.raw}"
            )
        return "\n\n".join(context_parts)

五、第四层:Agent.execute_task() 任务执行入口

# src/crewai/agent.py

def execute_task(
    self,
    task: Any,
    context: Optional[str] = None,
    tools: Optional[List[Any]] = None,
) -> TaskOutput:
    """
    Agent 执行单个 Task 的入口方法
    职责:构建最终的输入 Prompt,然后委托给 AgentExecutor
    """
    # ── 构建任务 Prompt ───────────────────────────────────────
    # task.prompt() 返回的是任务描述 + 期望输出的格式化文本
    task_prompt = task.prompt()

    # ── 附加上下文(前序 Task 的输出)────────────────────────
    if context:
        task_prompt = self.i18n.slice("task_with_context").format(
            task=task_prompt,
            context=context,
        )
        # i18n 翻译后大概是:
        # "{task}\n\nContext from previous tasks:\n{context}"

    # ── 知识库注入(如果 Agent 或 Crew 有配置 knowledge)────────
    if self.knowledge:
        knowledge_context = self.knowledge.query(task_prompt)
        if knowledge_context:
            task_prompt = f"{task_prompt}\n\n# Relevant Knowledge:\n{knowledge_context}"

    # ── 工具列表(Task 级别的工具优先级高于 Agent 级别)─────────
    tools = tools or task.tools or self.tools

    # ── 确定最终使用的工具列表(包含委派工具)───────────────────
    parsed_tools = self._parse_tools(tools)

    # ── 更新 AgentExecutor 的工具配置 ────────────────────────
    self.agent_executor.tools = parsed_tools

    # ── 发出 Agent 执行开始事件 ──────────────────────────────
    self._emit_agent_action_event(task, tools)

    # ── 委托给 AgentExecutor 执行(进入 ReAct 循环)──────────
    output = self.agent_executor.invoke(
        {
            "input": task_prompt,
            "tool_names": self._tools_names(parsed_tools),
            "tools": self._tools_description(parsed_tools),
            # 是否需要人工输入(由 task.human_input 控制)
            "ask_for_human_input": task.human_input,
        }
    )

    # ── 解析并构建 TaskOutput ─────────────────────────────────
    return self._build_task_output(output, task)

def _build_task_output(self, raw_output: str, task: Task) -> TaskOutput:
    """
    根据 task 的输出配置,解析 raw_output 为目标格式
    """
    from crewai.tasks.task_output import TaskOutput

    # 尝试解析为 JSON(如果 task.output_json 或 task.output_pydantic 配置了)
    pydantic_output = None
    json_output = None

    if task.output_pydantic or task.output_json:
        model_class = task.output_pydantic or task.output_json
        try:
            # 从 raw 文本中提取 JSON 部分
            json_str = self._extract_json(raw_output)
            parsed = json.loads(json_str)
            if task.output_pydantic:
                pydantic_output = model_class(**parsed)
            else:
                json_output = parsed
        except (json.JSONDecodeError, ValueError) as e:
            self._logger.warning(f"无法解析为 {model_class.__name__}{e}")

    return TaskOutput(
        description=task.description,
        expected_output=task.expected_output,
        raw=raw_output,
        pydantic=pydantic_output,
        json_dict=json_output,
        agent=self.role,
    )

六、第五层:CrewAgentExecutor._invoke_loop() ReAct 循环

这是整个框架最核心的代码,实现了 Agent 的推理-行动-观察循环。

# src/crewai/agents/crew_agent_executor.py

class CrewAgentExecutor(CrewAgentExecutorMixin):
    """
    Agent 的执行引擎,实现 ReAct(Reasoning + Acting)循环
    """

    def invoke(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """
        同步执行入口
        """
        # ── 格式化初始 Prompt ─────────────────────────────────
        # 判断是否有 system prompt(role + goal + backstory 组合)
        if "system" in inputs:
            # 分离 system prompt 和 user prompt
            system_prompt = self._format_prompt(inputs["system"])
            user_prompt = self._format_prompt(inputs["input"])
            self._append_message("system", system_prompt)
            self._append_message("user", user_prompt)
        else:
            prompt = self._format_prompt(inputs["input"])
            self._append_message("user", prompt)

        # ── 记录日志(verbose 模式)──────────────────────────────
        self._show_start_logs()

        # ── 读取 human_input 标志 ─────────────────────────────
        ask_for_human_input = inputs.get("ask_for_human_input", False)

        # ── 进入 ReAct 主循环 ─────────────────────────────────
        formatted_answer = self._invoke_loop()

        # ── 如果需要 human_input,获取用户反馈后再做一次推理 ─────
        if ask_for_human_input:
            formatted_answer = self._handle_human_feedback(formatted_answer)

        # ── 创建记忆(短时/长时/实体)────────────────────────────
        self._create_short_term_memory(formatted_answer)
        self._create_long_term_memory(formatted_answer)
        self._create_external_memory(formatted_answer)

        return {"output": formatted_answer.output}

    def _invoke_loop(self) -> AgentFinish:
        """
        核心 ReAct 循环:
        不断调用 LLM → 解析输出 → 执行工具 → 把观察结果加回 messages
        直到 LLM 输出最终答案(AgentFinish)或达到最大迭代次数
        """
        iterations = 0

        while iterations < self.max_iter:
            iterations += 1

            # ── 调用 LLM ─────────────────────────────────────
            formatted_answer = self.get_llm_response(
                self.messages,
                self.tools,
            )

            # ── 判断输出类型 ──────────────────────────────────
            if isinstance(formatted_answer, AgentFinish):
                # LLM 给出了最终答案,退出循环
                self._show_logs("agent_final_answer", formatted_answer.output)
                return formatted_answer

            # 到这里说明 LLM 输出了工具调用(AgentAction)
            # formatted_answer 是 AgentAction 实例
            action: AgentAction = formatted_answer

            # ── 记录工具调用日志 ──────────────────────────────
            self._show_logs("tool_use", action)

            # ── 执行工具并获取观察结果 ────────────────────────
            observation = execute_tool_and_check_finality(
                action=action,
                tools=self.tools,
                agent=self.agent,
            )

            # ── 把 (行动 + 观察) 附加到 messages ─────────────
            # 格式:
            # assistant: "Thought: ...\nAction: tool_name\nAction Input: ..."
            # user(tool_result): "Observation: ..."
            self._append_message(
                "assistant",
                f"{action.log}\nObservation: {observation}",
            )

            # ── 触发 step_callback(每步回调)────────────────
            if self.step_callback:
                self.step_callback(action, observation)

        # ── 超过最大迭代次数 ──────────────────────────────────
        return self._handle_max_iterations()

6.1 _handle_max_iterations():超出迭代限制的处理

def _handle_max_iterations(self) -> AgentFinish:
    """
    当 Agent 达到最大迭代次数时的处理:
    强制注入"请给出最终答案"的提示,让 LLM 收尾
    """
    self._logger.warning(
        f"Maximum iterations ({self.max_iter}) reached. "
        "Forcing final answer..."
    )

    # 注入强制收尾的 Prompt
    force_prompt = self.i18n.slice("force_final_answer_no_tools")
    # 大概是:"I now must return my best answer based on what I know."

    self._append_message("user", force_prompt)

    # 再调用一次 LLM,这次不提供工具(强制输出文本答案)
    return self.get_llm_response(self.messages, tools=[])

七、第六层:get_llm_response() LLM 推理

这一层负责把 messages 发给 LLM,并把响应解析为 AgentActionAgentFinish

# src/crewai/utilities/agent_utils.py

def get_llm_response(
    messages: List[LLMMessage],
    tools: List[Any],
    llm: Any,
    parser: "CrewAgentParser",
) -> Union[AgentAction, AgentFinish]:
    """
    调用 LLM 并解析响应
    
    两种 LLM 响应格式:
    1. 原生工具调用(tool_calls):现代 LLM 的结构化工具调用
    2. ReAct 文本格式:旧式文本解析("Action: xxx\nAction Input: xxx")
    """
    # ── 实际调用 LLM ──────────────────────────────────────────
    # CrewAI 使用 LiteLLM 作为统一 LLM 接口
    # LiteLLM 会根据模型名路由到对应的 API(OpenAI/Anthropic/Gemini/...)
    response = llm.call(
        messages=messages,
        tools=_format_tools_for_llm(tools),  # 转换为 OpenAI 工具格式
    )

    # ── 检查是否有原生工具调用 ────────────────────────────────
    if hasattr(response, "tool_calls") and response.tool_calls:
        # 现代 LLM 的结构化工具调用(OpenAI function calling 格式)
        tool_call = response.tool_calls[0]
        return AgentAction(
            tool=tool_call.function.name,
            tool_input=json.loads(tool_call.function.arguments),
            log=response.content or "",
            tool_call_id=tool_call.id,
        )

    # ── 没有原生工具调用,解析文本格式 ───────────────────────
    text_response = response.content if hasattr(response, "content") else str(response)

    # 交给解析器处理(ReAct 文本格式解析)
    return parser.parse(text_response)

7.1 LiteLLM 的作用

# src/crewai/llm/llm.py(简化)

class LLM:
    """
    CrewAI 的 LLM 适配层,内部用 LiteLLM 统一不同 LLM 提供商的接口
    
    支持的模型(通过 model 名称路由):
      openai/gpt-4o         → OpenAI API
      anthropic/claude-3-5  → Anthropic API
      gemini/gemini-pro     → Google AI
      azure/gpt-4           → Azure OpenAI
      bedrock/claude        → AWS Bedrock
      ollama/llama3         → 本地 Ollama
      ...(100+ 模型提供商)
    """

    def call(self, messages: List[dict], tools: List[dict] = None) -> Any:
        """
        统一调用接口,内部委托给 litellm.completion()
        """
        import litellm

        kwargs = {
            "model": self.model,
            "messages": messages,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
        }

        if tools:
            kwargs["tools"] = tools
            kwargs["tool_choice"] = "auto"

        # 速率限制(如果配置了 max_rpm)
        if self._rpm_controller:
            self._rpm_controller.check_or_wait()

        try:
            response = litellm.completion(**kwargs)
            return response.choices[0].message
        except litellm.RateLimitError:
            # 速率超限时等待后重试
            time.sleep(60)
            return self.call(messages, tools)
        except litellm.ContextWindowExceededError:
            # 上下文窗口超限时的处理
            if self.respect_context_window:
                messages = self._trim_messages(messages)
                return self.call(messages, tools)
            raise

八、第七层:工具调用链路

# src/crewai/utilities/tool_utils.py

def execute_tool_and_check_finality(
    action: AgentAction,
    tools: List[BaseTool],
    agent: "Agent",
) -> str:
    """
    执行 AgentAction 中指定的工具,返回观察结果(字符串)
    
    完整职责:
    1. 从工具列表中找到对应的工具
    2. 调用工具(带超时和错误处理)
    3. 检查是否是"委派"类型的工具调用
    4. 返回工具结果字符串
    """
    tool_name = action.tool
    tool_input = action.tool_input

    # ── 查找工具 ──────────────────────────────────────────────
    tool = _find_tool(tool_name, tools)

    if tool is None:
        # 工具不存在:返回错误提示(不是异常,让 Agent 自行处理)
        return agent.i18n.errors("tool_usage").format(tool=tool_name)

    # ── 检查工具是否被缓存 ────────────────────────────────────
    if tool.cache_function and tool.cache_function(tool_input):
        cached_result = _get_from_cache(tool_name, tool_input)
        if cached_result is not None:
            return cached_result

    # ── 执行工具 ──────────────────────────────────────────────
    try:
        # BaseTool.run() 是工具执行的统一入口
        # 内部会调用 _run() 方法(用户实现的具体逻辑)
        result = tool.run(tool_input)

        # ── 缓存结果 ──────────────────────────────────────────
        if tool.cache_function and tool.cache_function(tool_input):
            _save_to_cache(tool_name, tool_input, result)

        return str(result)

    except ToolException as e:
        # 工具执行出错:返回错误信息给 Agent,让 Agent 决定如何处理
        if tool.handle_tool_error:
            return str(e)
        else:
            # 如果工具不处理错误,重新抛出(Agent 上层会处理)
            raise

def _find_tool(name: str, tools: List[BaseTool]) -> Optional[BaseTool]:
    """
    查找工具:支持精确匹配和模糊匹配
    LLM 有时会把工具名大小写搞错,用模糊匹配兜底
    """
    # 精确匹配
    for tool in tools:
        if tool.name == name:
            return tool

    # 不区分大小写的匹配
    name_lower = name.lower().replace(" ", "_").replace("-", "_")
    for tool in tools:
        tool_name_lower = tool.name.lower().replace(" ", "_").replace("-", "_")
        if tool_name_lower == name_lower:
            return tool

    return None

8.1 BaseTool.run():工具执行的统一入口

# src/crewai/tools/base_tool.py

class BaseTool(BaseModel, ABC):
    """
    所有 CrewAI 工具的基类
    """
    name: str
    description: str
    args_schema: Optional[Type[BaseModel]] = None   # 输入参数的 Pydantic schema
    cache_function: Optional[Callable] = None        # 是否缓存(函数返回 True 则缓存)
    result_as_answer: bool = False                   # 结果是否直接作为 Agent 的最终答案

    def run(self, tool_input: Union[str, Dict]) -> Any:
        """
        工具执行的统一入口(模板方法模式)
        子类只需要实现 _run() 方法
        """
        try:
            # ── 解析输入 ──────────────────────────────────────
            # tool_input 可能是 JSON 字符串或 dict
            if isinstance(tool_input, str):
                try:
                    parsed_input = json.loads(tool_input)
                except json.JSONDecodeError:
                    # 不是 JSON,作为单个字符串参数
                    parsed_input = tool_input
            else:
                parsed_input = tool_input

            # ── 如果有 args_schema,用 Pydantic 验证参数 ────────
            if self.args_schema:
                if isinstance(parsed_input, dict):
                    validated = self.args_schema(**parsed_input)
                    parsed_input = validated.model_dump()

            # ── 调用实际工具实现 ──────────────────────────────
            result = self._run(parsed_input)

            # ── 发出工具执行事件 ──────────────────────────────
            crewai_event_bus.emit(
                self,
                event=ToolUsageFinishedEvent(
                    tool_name=self.name,
                    tool_input=tool_input,
                    tool_output=str(result),
                ),
            )

            return result

        except ValidationError as e:
            # Pydantic 验证失败:返回友好的错误提示
            return f"Tool input validation failed: {e}"

        except Exception as e:
            if self.handle_tool_error:
                return f"Tool execution error: {str(e)}"
            raise ToolException(str(e)) from e

    @abstractmethod
    def _run(self, *args, **kwargs) -> Any:
        """子类实现具体的工具逻辑"""
        pass

8.2 @tool 装饰器:快速创建工具

# src/crewai/tools/tool_decorator.py

def tool(name_or_func=None, *args, **kwargs):
    """
    把普通函数包装成 CrewAI 工具的装饰器
    
    用法:
    @tool("搜索工具")
    def web_search(query: str) -> str:
        '''在网络上搜索给定的查询词,返回前5条结果'''
        return serper_api.search(query)
    """
    def decorator(func):
        # 从函数的 docstring 提取工具 description
        tool_description = inspect.cleandoc(func.__doc__ or "")

        # 从函数签名自动生成 args_schema(Pydantic 模型)
        sig = inspect.signature(func)
        fields = {}
        for param_name, param in sig.parameters.items():
            annotation = param.annotation
            if annotation == inspect.Parameter.empty:
                annotation = Any
            default = param.default if param.default != inspect.Parameter.empty else ...
            fields[param_name] = (annotation, Field(default=default))

        # 动态创建 Pydantic 模型作为 args_schema
        DynamicSchema = create_model(f"{func.__name__}_schema", **fields)

        # 创建工具类实例
        class WrappedTool(BaseTool):
            name: str = tool_name
            description: str = tool_description
            args_schema: Type[BaseModel] = DynamicSchema

            def _run(self, **kwargs) -> Any:
                return func(**kwargs)

        return WrappedTool()

    tool_name = name_or_func if isinstance(name_or_func, str) else func.__name__
    if callable(name_or_func):
        return decorator(name_or_func)
    return decorator

九、输出解析:CrewAgentParser 如何解析非结构化文本

当 LLM 不支持原生工具调用(或模型输出格式不规范)时,CrewAI 需要解析 ReAct 格式的文本:

# ReAct 格式的 LLM 输出示例:
Thought: I need to search for the latest AI news.
Action: web_search
Action Input: {"query": "latest AI news 2026"}

# 或者最终答案:
Thought: I now have enough information to answer.
Final Answer: Based on my research, the latest trends in AI are...
# src/crewai/agents/parser.py

class CrewAgentParser:
    """
    解析 LLM 的文本输出为 AgentAction 或 AgentFinish
    """
    # ReAct 格式的正则表达式
    ACTION_PATTERN = r"Action\s*:\s*(.+?)(?:\n|$)"
    ACTION_INPUT_PATTERN = r"Action\s*Input\s*:\s*(.+?)(?:\nObservation|\Z)"
    FINAL_ANSWER_PATTERN = r"Final\s*Answer\s*:\s*(.+)"

    def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
        """
        解析 LLM 输出
        
        解析优先级:
        1. 如果包含 "Final Answer":返回 AgentFinish
        2. 如果包含 "Action" + "Action Input":返回 AgentAction
        3. 其他情况:也返回 AgentFinish(最后一次推理)
        """
        # ── 检查是否有最终答案 ────────────────────────────────
        final_match = re.search(self.FINAL_ANSWER_PATTERN, text, re.DOTALL | re.IGNORECASE)
        if final_match:
            return AgentFinish(
                output=final_match.group(1).strip(),
                log=text,
            )

        # ── 检查是否有工具调用 ────────────────────────────────
        action_match = re.search(self.ACTION_PATTERN, text, re.IGNORECASE)
        action_input_match = re.search(self.ACTION_INPUT_PATTERN, text, re.DOTALL | re.IGNORECASE)

        if action_match:
            tool_name = action_match.group(1).strip()
            tool_input_str = action_input_match.group(1).strip() if action_input_match else "{}"

            # 尝试解析 tool_input 为 dict
            try:
                tool_input = json.loads(tool_input_str)
            except json.JSONDecodeError:
                # JSON 解析失败,作为字符串处理
                tool_input = tool_input_str

            return AgentAction(
                tool=tool_name,
                tool_input=tool_input,
                log=text,
            )

        # ── 无法识别格式:强制作为最终答案 ─────────────────────
        # 这种情况出现在:LLM 直接给出答案,没有遵循 ReAct 格式
        return AgentFinish(
            output=text.strip(),
            log=text,
        )

    def _fix_common_mistakes(self, text: str) -> str:
        """
        修复 LLM 输出中的常见格式错误
        """
        # 修复:Action : tool_name(冒号前有空格)
        text = re.sub(r"Action\s+:", "Action:", text)

        # 修复:Action Input 写成了 Input
        text = re.sub(r"(?<!\w)Input\s*:", "Action Input:", text)

        # 修复:Final answer(小写 a)
        text = re.sub(r"Final\s+answer\s*:", "Final Answer:", text, flags=re.IGNORECASE)

        return text

十、上下文传递机制:Task 之间如何共享信息

理解上下文传递对于正确使用 CrewAI 非常重要。

10.1 隐式上下文(Sequential 默认模式)

# 每个 Task 自动获得前序所有 Task 的输出
# 构建的上下文字符串大概是:

"""
Task 1 output:
(这里是第一个 Task 的完整输出文本)

Task 2 output:
(这里是第二个 Task 的完整输出文本)
"""

# 这个上下文被拼接到当前 Task 的 Prompt 后面
# 可能导致的问题:
# - 随着 Task 增多,上下文越来越长(Token 消耗急剧增加)
# - 解决:设置 task.context = [specific_task](显式依赖)

10.2 显式上下文(推荐生产用)

# 显式指定依赖关系:analysis_task 只接收 research_task 的输出
# 而不是所有前序 Task 的输出

research_task = Task(
    description="研究 AI 市场趋势",
    expected_output="详细的市场分析报告",
    agent=researcher,
)

analysis_task = Task(
    description="基于研究结果,分析投资机会",
    expected_output="投资建议列表",
    agent=analyst,
    context=[research_task],  # ← 显式指定只依赖 research_task
)

writing_task = Task(
    description="将分析结果写成报告",
    expected_output="完整的投资报告",
    agent=writer,
    context=[research_task, analysis_task],  # ← 同时依赖两个 Task
)

10.3 上下文的 Token 预算管理

# Agent 的 respect_context_window=True 时(默认开启)
# CrewAI 会在 context 过长时自动截断

def _trim_context_to_fit(
    self,
    context: str,
    task_prompt: str,
    max_tokens: int,
) -> str:
    """
    当 context + task_prompt 超过 context window 时,截断 context
    
    截断策略:
    1. 保留最后 N 个 Task 的输出(最新的最重要)
    2. 对每个输出做摘要(通过一次额外的 LLM 调用)
    """
    # 估算当前 token 数(粗略估算:1 token ≈ 4 个字符)
    current_tokens = (len(context) + len(task_prompt)) // 4

    if current_tokens <= max_tokens * 0.8:  # 留 20% buffer
        return context

    # 超限:对 context 做摘要
    summary_prompt = f"请简洁地总结以下内容,保留关键信息:\n{context}"
    summary = self.llm.call(
        [{"role": "user", "content": summary_prompt}]
    )
    return summary.content

十一、完整调用栈追踪 + 关键断点位置

11.1 完整调用栈(可跟踪调试的版本)

crew.kickoff(inputs={"topic": "AI 监管"})
│
├── crew._interpolate_inputs(inputs)          # 变量插值
├── crew._handle_crew_planning()              # [可选] 规划模式
│
└── crew._run_process()
    │
    └── crew._execute_tasks(tasks)            # Sequential 模式
        │
        └── for task in tasks:
            │
            ├── crew._get_context(task, outputs)   # 聚合上下文
            │
            └── crew._execute_single_task(task, context)
                │
                ├── agent.execute_task(task, context, tools)
                │   │
                │   ├── task.prompt()              # 构建任务 Prompt
                │   ├── agent.knowledge.query()    # [可选] 知识库检索
                │   │
                │   └── agent_executor.invoke(inputs)
                │       │
                │       ├── _append_message("system", system_prompt)
                │       ├── _append_message("user", task_prompt)
                │       │
                │       └── _invoke_loop()         # ← ReAct 核心循环
                │           │
                │           ├── [iteration 1]
                │           │   ├── get_llm_response(messages, tools)
                │           │   │   ├── llm.call(messages)        # LiteLLM → API
                │           │   │   └── parser.parse(response)    # AgentAction
                │           │   │
                │           │   ├── execute_tool_and_check_finality(action)
                │           │   │   ├── _find_tool(action.tool)
                │           │   │   └── tool.run(action.tool_input)
                │           │   │       └── tool._run(**kwargs)    # 实际执行
                │           │   │
                │           │   └── _append_message("assistant", action+observation)
                │           │
                │           ├── [iteration 2] ...
                │           │
                │           └── [final iteration]
                │               ├── get_llm_response(messages, tools)
                │               └── parser.parse(response) → AgentFinish ✓
                │
                └── _handle_guardrail(task, output)   # [可选] 质量检查

11.2 关键调试断点(按问题类型)

# 问题1:Agent 没有按预期调用工具
# 断点位置:agent_utils.py → get_llm_response()
# 查看:messages 的内容(系统 Prompt 是否正确描述了工具)
# 查看:LLM 的原始响应(是否输出了 Action 字段)

# 问题2:工具调用失败
# 断点位置:tool_utils.py → execute_tool_and_check_finality()
# 查看:action.tool 是否与注册的工具名完全匹配
# 查看:action.tool_input 的 JSON 格式是否正确

# 问题3:Agent 陷入死循环
# 断点位置:crew_agent_executor.py → _invoke_loop() 的 while 条件
# 查看:iterations 是否在递增
# 查看:max_iter 配置值(默认 20,可能太高)

# 问题4:上下文过长导致报错
# 断点位置:crew.py → _get_context()
# 查看:返回的 context 字符串长度
# 解决:显式设置 task.context = [specific_tasks]

# 问题5:输出格式不符合预期
# 断点位置:agent.py → _build_task_output()
# 查看:raw_output 的原始内容
# 查看:_extract_json() 是否正确提取了 JSON

# 实用调试命令:
crew = Crew(agents=[...], tasks=[...], verbose=True)  # 开启详细日志
# verbose=True 会打印每次 LLM 调用的输入输出

11.3 性能热点分析

通过分析调用栈,可以看出性能瓶颈在哪里:

#1 LLM API 调用(最慢,占总时间 80%+)
  位置:llm.call() → litellm.completion()
  延迟:500ms - 10s(取决于模型和响应长度)
  优化:
    - 使用更快的模型(Haiku vs Opus)
    - 减少 ReAct 迭代次数(更好的工具描述)
    - 缓存工具结果(tool.cache_function)

#2 工具执行(次慢,占 10-15%)
  位置:tool._run()
  延迟:取决于外部 API(网络搜索 100ms-5s)
  优化:
    - 工具结果缓存
    - 并行执行多个工具调用(目前 CrewAI 串行)

#3 上下文构建(轻微,<5%)
  位置:_get_context() → 字符串拼接
  延迟:<10ms
  优化:显式 context 依赖减少不必要的文本传递

#4 Pydantic 验证(可忽略,<1%)
  位置:工具参数验证
  延迟:<1ms

本篇总结

这篇把 crew.kickoff() 背后的完整链路追踪了一遍:

Layer 1  kickoff():变量插值、规划、回调
Layer 2  _run_process():策略模式分发
Layer 3  _execute_tasks():顺序遍历 + 异步支持 + 上下文传递
Layer 4  execute_task():Prompt 构建 + 知识库注入
Layer 5  _invoke_loop():ReAct 核心循环(迭代直到 AgentFinish)
Layer 6  get_llm_response():LiteLLM → LLM API → 解析响应
Layer 7  execute_tool_and_check_finality():工具查找 + 执行 + 错误处理

三个关键设计:
  ① 工具找不到不抛异常,返回错误字符串让 Agent 自行决策
  ② max_iter 超限后强制注入 "give final answer" Prompt
  ③ 上下文默认全量传递,生产环境建议显式指定 task.context

第三篇预告

《CrewAI 源码剖析·第三篇:记忆与知识系统——短时/长时/实体记忆怎么存读,RAG Knowledge 如何接入》

将要深入的代码:

  • ShortTermMemory:RAG-Mem0 实现,对话内的临时记忆
  • LongTermMemory:SQLite + 嵌入向量,跨 kickoff 的持久记忆
  • EntityMemory:对话中提到的实体(人物/组织/地点)自动提取
  • UserMemory:用户画像记忆
  • Knowledge:文件/URL 接入 + 向量检索 + 查询重写

💬 你在追踪 Agent 执行时最头疼的 Bug 是什么类型的? 欢迎评论区分享!

🙏 如果这篇帮到你,一键三连(点赞👍 + 收藏⭐ + 关注)!第三篇记忆系统即将发布!


参考资料

  1. CrewAI 源码 v1.14.x:https://github.com/crewAIInc/crewAI
  2. CrewAI Agent Executor 深度分析(DeepWiki):https://deepwiki.com/lymanzhang/crewAI/5.1-cli-commands-reference
  3. CrewAI Process Types 源码分析(DeepWiki):https://deepwiki.com/crewAIInc/crewAI/2.4-process-types
  4. LiteLLM 文档:https://docs.litellm.ai

本文基于 CrewAI v1.14.x 源码分析,代码已略作简化以突出核心逻辑。最后更新:2026-05-11

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐