【CrewAI 源码剖析·第二篇】kickoff 执行链路全解析:从入口函数到 LLM 工具调用,每一行代码都追踪到底
本文深入解析了CrewAI框架中crew.kickoff()方法的完整执行链路,通过七层调用栈详细剖析了从入口函数到LLM工具调用的全过程。文章首先展示了整体执行流程的分层结构,包括变量插值、任务调度、Agent执行、LLM推理等关键环节。随后重点分析了入口函数kickoff()的实现细节,包括回调处理、变量插值机制和规划模式的工作原理。其中特别介绍了SafeDict类如何实现安全的变量替换,以及
【CrewAI 源码剖析·第二篇】kickoff 执行链路全解析:从入口函数到 LLM 工具调用,每一行代码都追踪到底
作者:技术博主 | 更新时间:2026-05-11 | 阅读时长:约 24 分钟
系列:CrewAI 源码剖析(共 4 篇)
版本:CrewAI v1.14.x(2026-05 最新稳定版)
标签:CrewAI源码分析kickoffAgentExecutorReAct工具调用执行链路Python

🔥 本篇目标:
crew.kickoff()一行代码背后发生了什么?本篇从入口函数开始,追踪完整的调用栈:任务调度 → Agent 执行 → LLM 推理 → 工具调用 → 输出解析。理解这条链路,才能真正知道 CrewAI 的 Bug 出在哪、性能瓶颈在哪、以及如何做定制化扩展。
系列进度
| 篇次 | 主题 | 状态 |
|---|---|---|
| 第一篇 | 整体架构与四大原语源码解析 | ✅ 已发布 |
| 第二篇(本篇) | kickoff 执行链路:从入口到工具调用全链路 | — |
| 第三篇 | 记忆与知识系统:短时/长时/实体记忆 + RAG | 即将发布 |
| 第四篇 | Flow 事件驱动系统:装饰器状态机原理 | 即将发布 |
目录
- 一、执行链路总览:七层调用栈
- 二、第一层:
Crew.kickoff()入口函数 - 三、第二层:
_run_process()→ 策略分发 - 四、第三层:Sequential 顺序执行引擎
- 五、第四层:
Agent.execute_task()任务执行入口 - 六、第五层:
CrewAgentExecutor._invoke_loop()ReAct 循环 - 七、第六层:
get_llm_response()LLM 推理 - 八、第七层:工具调用链路
execute_tool_and_check_finality() - 九、输出解析:
CrewAgentParser如何解析非结构化文本 - 十、上下文传递机制:Task 之间如何共享信息
- 十一、完整调用栈追踪 + 关键断点位置
一、执行链路总览:七层调用栈
先看整体,再逐层深入。crew.kickoff() 到最终输出,经历七个层次:
Layer 1 crew.kickoff(inputs)
↓ 变量插值、回调、规划
Layer 2 crew._run_process()
↓ 根据 Process.sequential / hierarchical 分发
Layer 3 Sequential: _execute_tasks() / Hierarchical: _execute_hierarchical()
↓ 遍历 tasks,逐个调度
Layer 4 agent.execute_task(task, context)
↓ 构建 Prompt,委托 Executor
Layer 5 CrewAgentExecutor.invoke()
↓ 进入 ReAct 循环 _invoke_loop()
Layer 6 get_llm_response(messages)
↓ 调用 LLM(LiteLLM 路由到具体模型)
↓ 解析响应 → AgentAction 或 AgentFinish
Layer 7 execute_tool_and_check_finality(action)
↓ 查找并执行对应工具
↓ 观察结果追加到 messages,回到 Layer 5
↓ 或返回最终答案 AgentFinish
↓
TaskOutput(raw / json / pydantic)
关键文件路径(对照源码阅读时用):
src/crewai/
crew.py # Layer 1-2
crews/crew_output.py # CrewOutput 模型
agents/crew_agent_executor.py # Layer 5-6(核心 ReAct 循环)
agents/agent_builder/ # Agent 构建器
utilities/agent_utils.py # Layer 6 LLM 调用工具函数
utilities/tool_utils.py # Layer 7 工具执行
tasks/task_output.py # 输出模型
agents/parser.py # 输出解析器
二、第一层:Crew.kickoff() 入口函数
# src/crewai/crew.py(精简,聚焦关键逻辑)
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
"""
Crew 执行的主入口。
调用前:Crew 已初始化(Agent、Task、记忆系统均已装配)
调用后:返回 CrewOutput,包含最后一个 Task 的输出和所有 Task 的输出列表
"""
# ── 步骤1:执行 before_kickoff 回调 ──────────────────────
# @before_kickoff 装饰器注册的函数在这里被调用
# 可以用来修改 inputs(比如补充默认值、格式化输入)
for callback in self.before_kickoff_callbacks:
inputs = callback(inputs) or inputs
# ── 步骤2:变量插值 ──────────────────────────────────────
# 把 inputs 里的值替换到 Agent/Task 的 {variable} 占位符
self._interpolate_inputs(inputs or {})
# ── 步骤3:规划模式(可选)───────────────────────────────
# 如果 Crew(planning=True),先让一个 Planning Agent 分析任务
# 生成每个 Task 的执行步骤,插入到 Task 的描述中
if self.planning:
self._handle_crew_planning()
# ── 步骤4:遥测:记录本次执行的开始事件 ─────────────────
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
# ── 步骤5:核心执行 ──────────────────────────────────────
try:
result = self._run_process()
except Exception as e:
# 捕获执行中的异常,发出错误事件,然后重新抛出
self._telemetry.crew_execution_error(self._execution_span, e)
raise
# ── 步骤6:执行 after_kickoff 回调 ───────────────────────
for callback in self.after_kickoff_callbacks:
callback(result)
# ── 步骤7:记录遥测结束事件 ──────────────────────────────
self._telemetry.end_crew_execution_span(self._execution_span, result)
return result
2.1 _interpolate_inputs():变量插值的实现
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""
把 inputs 字典的值替换到所有 Agent 和 Task 的文本字段中
设计细节:
- 用 Python 的 str.format_map() 而不是 format()
- format_map() 遇到不存在的 key 不会报错(用 SafeDict 兜底)
- 原始模板保存在 _original_description,支持重复插值(kickoff_for_each)
"""
# 对每个 Agent 的三个文本字段做插值
for agent in self.agents:
agent.interpolate_inputs(inputs)
# 对每个 Task 的 description 做插值
for task in self.tasks:
task.interpolate_inputs(inputs)
class SafeDict(dict):
"""
缺失 key 时返回 '{key}' 原字符串,不抛出 KeyError
这样 description 中存在但 inputs 里没提供的变量,保持原样
"""
def __missing__(self, key):
return "{" + key + "}"
2.2 _handle_crew_planning():规划模式内部
def _handle_crew_planning(self) -> None:
"""
Planning 模式:在正式执行前,让一个 Planning Agent 生成执行计划
计划会以文本形式插入到每个 Task 的 description 前面
"""
from crewai.utilities.crew_planner import CrewPlanner
# 构建规划 Prompt(把所有 Task 的描述和 Agent 的能力告诉规划者)
planner = CrewPlanner(
tasks=self.tasks,
planning_agent_llm=self.planning_llm or self.agents[0].llm,
)
# 规划 Agent 输出一个 JSON,每个 Task 对应一个执行步骤列表
plan = planner.create_plan()
# 把计划注入到对应 Task 的 description 头部
for task, task_plan in zip(self.tasks, plan.list_of_plans_per_task):
task.description = (
f"Planning Step:\n{task_plan}\n\n"
f"Original Task Description:\n{task.description}"
)
三、第二层:_run_process() → 策略分发
def _run_process(self) -> CrewOutput:
"""
根据 self.process 选择执行策略
这里是策略模式(Strategy Pattern)的标准实现
"""
if self.process == Process.sequential:
return self._execute_tasks(self.tasks)
elif self.process == Process.hierarchical:
return self._execute_hierarchical_process()
else:
raise NotImplementedError(
f"Process type '{self.process}' is not implemented."
)
3.1 Hierarchical 模式的 Manager Agent
def _execute_hierarchical_process(self) -> CrewOutput:
"""
层级模式执行:由 Manager Agent 统一调度所有 Worker Agent
Manager Agent 不直接执行任务,而是负责:
1. 把大任务分解成子任务
2. 把子任务委派给合适的 Worker
3. 审核 Worker 的输出
4. 决定是否需要重新执行
"""
# 如果用户没有指定 manager_agent,自动创建一个
if not self.manager_agent:
self.manager_agent = Agent(
role="Crew Manager",
goal="Manage the crew to complete the task in the best possible way",
backstory="You are a seasoned manager with a knack for getting the best "
"out of your team. You are also known for your ability to "
"delegate work to the right person.",
llm=self.manager_llm,
# Manager Agent 自动获得"委派工具"(把子任务分配给 Worker)
allow_delegation=True,
verbose=self.verbose,
)
# 把所有 Worker Agent 注入到 Manager 的委派工具里
self.manager_agent.crew = self
self.manager_agent.set_tools_handler()
# Manager 执行的是一个"综合任务":协调完成所有子任务
manager_task = Task(
description=self._build_manager_task_description(),
expected_output="A comprehensive final answer that completes the crew's tasks",
agent=self.manager_agent,
)
# Manager 执行完成后,从其输出中提取最终结果
result = self.manager_agent.execute_task(
task=manager_task,
context=self._get_context(manager_task, []),
)
return self._build_crew_output(result, [manager_task])
四、第三层:Sequential 顺序执行引擎
Sequential 模式是最常用的,也是最值得细看的。
def _execute_tasks(
self,
tasks: List[Task],
start_index: int = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""
顺序执行所有 Task 的核心循环
参数:
start_index:从第几个 Task 开始(用于 replay 功能)
was_replayed:是否是从断点恢复
"""
task_outputs: List[TaskOutput] = []
# 从上次中断的位置开始(或从头开始)
for task_index, task in enumerate(tasks[start_index:], start=start_index):
# ── 处理异步 Task ─────────────────────────────────────
if task.async_execution:
# 异步 Task:提交到线程池,不等待结果,继续下一个 Task
context = self._get_context(task, task_outputs)
future = self._task_futures[task] = self.thread_pool.submit(
self._execute_single_task,
task=task,
context=context,
)
task_outputs.append(None) # 占位,后面会填充
continue
# ── 等待所有已提交的异步 Task 完成 ─────────────────────
# 当遇到非异步 Task 时,需要先等前面的异步 Task 都完成
# 因为非异步 Task 可能依赖前面异步 Task 的输出
self._wait_async_tasks(task_outputs)
# ── 执行同步 Task ─────────────────────────────────────
# 上下文 = 所有已完成 Task 的输出(或由 task.context 指定的特定 Task)
context = self._get_context(task, task_outputs)
task_output = self._execute_single_task(task=task, context=context)
task_outputs[task_index] = task_output # 替换占位符
# ── 触发 task_callback ────────────────────────────────
if self.task_callback:
self.task_callback(task_output)
# 等待所有异步 Task 最终完成(如果最后几个是异步的)
self._wait_async_tasks(task_outputs)
# 构建最终输出(取最后一个 Task 的输出作为 Crew 的输出)
return self._build_crew_output(
final_output=task_outputs[-1],
tasks_output=task_outputs,
)
4.1 _execute_single_task():单个 Task 的完整执行
def _execute_single_task(
self,
task: Task,
context: str,
) -> TaskOutput:
"""
执行单个 Task 的完整流程,包括 Guardrail 重试
"""
# ── 发出 Task 开始事件(遥测/日志系统监听)─────────────────
self._logger.info(
f"\n\n\033[1m\033[95m [{'Task' if not task.async_execution else 'Async Task'}]\033[00m"
f"\033[1m\033[92m Starting Task: {task.description}\033[00m\n",
)
# ── 确定执行 Agent ────────────────────────────────────────
# task.agent 在 sequential 模式下必须显式指定
agent = task.agent
if not agent:
raise ValueError(f"Task '{task.description}' has no agent assigned.")
# ── 如果需要 human_input,在执行前先让用户输入 ─────────────
if task.human_input:
human_feedback = self._get_human_input(task)
context = f"{context}\n\nHuman Feedback: {human_feedback}" if context else human_feedback
# ── 核心执行:委托给 Agent ──────────────────────────────────
task_output = agent.execute_task(
task=task,
context=context,
tools=task.tools or agent.tools,
)
# ── 处理输出到文件 ────────────────────────────────────────
if task.output_file:
self._write_output_file(task.output_file, task_output.raw)
# ── Guardrail 质量检查 ────────────────────────────────────
if task.guardrail:
task_output = self._handle_guardrail(task, task_output, agent)
# ── 保存输出到 task.output(供后续 Task 引用)──────────────
task.output = task_output
return task_output
4.2 _get_context():上下文聚合逻辑
这是 Task 间数据流转的核心:
def _get_context(
self,
task: Task,
task_outputs: List[TaskOutput],
) -> str:
"""
为当前 Task 构建上下文字符串
上下文来源有两种:
1. task.context 显式指定的依赖 Task 列表
2. 默认:当前 Task 之前所有已完成 Task 的输出
"""
if task.context:
# 模式1:显式上下文依赖
# task.context = [task_a, task_b]
# 只使用指定 Task 的输出作为上下文
context_parts = []
for dep_task in task.context:
if dep_task.output:
context_parts.append(
f"Output from task '{dep_task.description[:50]}...':\n"
f"{dep_task.output.raw}"
)
return "\n\n".join(context_parts)
else:
# 模式2:隐式上下文(所有前序 Task 的输出)
# 这是 sequential 模式的默认行为
completed_outputs = [o for o in task_outputs if o is not None]
if not completed_outputs:
return ""
context_parts = []
for i, output in enumerate(completed_outputs):
context_parts.append(
f"Task {i+1} output:\n{output.raw}"
)
return "\n\n".join(context_parts)
五、第四层:Agent.execute_task() 任务执行入口
# src/crewai/agent.py
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> TaskOutput:
"""
Agent 执行单个 Task 的入口方法
职责:构建最终的输入 Prompt,然后委托给 AgentExecutor
"""
# ── 构建任务 Prompt ───────────────────────────────────────
# task.prompt() 返回的是任务描述 + 期望输出的格式化文本
task_prompt = task.prompt()
# ── 附加上下文(前序 Task 的输出)────────────────────────
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt,
context=context,
)
# i18n 翻译后大概是:
# "{task}\n\nContext from previous tasks:\n{context}"
# ── 知识库注入(如果 Agent 或 Crew 有配置 knowledge)────────
if self.knowledge:
knowledge_context = self.knowledge.query(task_prompt)
if knowledge_context:
task_prompt = f"{task_prompt}\n\n# Relevant Knowledge:\n{knowledge_context}"
# ── 工具列表(Task 级别的工具优先级高于 Agent 级别)─────────
tools = tools or task.tools or self.tools
# ── 确定最终使用的工具列表(包含委派工具)───────────────────
parsed_tools = self._parse_tools(tools)
# ── 更新 AgentExecutor 的工具配置 ────────────────────────
self.agent_executor.tools = parsed_tools
# ── 发出 Agent 执行开始事件 ──────────────────────────────
self._emit_agent_action_event(task, tools)
# ── 委托给 AgentExecutor 执行(进入 ReAct 循环)──────────
output = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self._tools_names(parsed_tools),
"tools": self._tools_description(parsed_tools),
# 是否需要人工输入(由 task.human_input 控制)
"ask_for_human_input": task.human_input,
}
)
# ── 解析并构建 TaskOutput ─────────────────────────────────
return self._build_task_output(output, task)
def _build_task_output(self, raw_output: str, task: Task) -> TaskOutput:
"""
根据 task 的输出配置,解析 raw_output 为目标格式
"""
from crewai.tasks.task_output import TaskOutput
# 尝试解析为 JSON(如果 task.output_json 或 task.output_pydantic 配置了)
pydantic_output = None
json_output = None
if task.output_pydantic or task.output_json:
model_class = task.output_pydantic or task.output_json
try:
# 从 raw 文本中提取 JSON 部分
json_str = self._extract_json(raw_output)
parsed = json.loads(json_str)
if task.output_pydantic:
pydantic_output = model_class(**parsed)
else:
json_output = parsed
except (json.JSONDecodeError, ValueError) as e:
self._logger.warning(f"无法解析为 {model_class.__name__}:{e}")
return TaskOutput(
description=task.description,
expected_output=task.expected_output,
raw=raw_output,
pydantic=pydantic_output,
json_dict=json_output,
agent=self.role,
)
六、第五层:CrewAgentExecutor._invoke_loop() ReAct 循环
这是整个框架最核心的代码,实现了 Agent 的推理-行动-观察循环。
# src/crewai/agents/crew_agent_executor.py
class CrewAgentExecutor(CrewAgentExecutorMixin):
"""
Agent 的执行引擎,实现 ReAct(Reasoning + Acting)循环
"""
def invoke(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
同步执行入口
"""
# ── 格式化初始 Prompt ─────────────────────────────────
# 判断是否有 system prompt(role + goal + backstory 组合)
if "system" in inputs:
# 分离 system prompt 和 user prompt
system_prompt = self._format_prompt(inputs["system"])
user_prompt = self._format_prompt(inputs["input"])
self._append_message("system", system_prompt)
self._append_message("user", user_prompt)
else:
prompt = self._format_prompt(inputs["input"])
self._append_message("user", prompt)
# ── 记录日志(verbose 模式)──────────────────────────────
self._show_start_logs()
# ── 读取 human_input 标志 ─────────────────────────────
ask_for_human_input = inputs.get("ask_for_human_input", False)
# ── 进入 ReAct 主循环 ─────────────────────────────────
formatted_answer = self._invoke_loop()
# ── 如果需要 human_input,获取用户反馈后再做一次推理 ─────
if ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
# ── 创建记忆(短时/长时/实体)────────────────────────────
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish:
"""
核心 ReAct 循环:
不断调用 LLM → 解析输出 → 执行工具 → 把观察结果加回 messages
直到 LLM 输出最终答案(AgentFinish)或达到最大迭代次数
"""
iterations = 0
while iterations < self.max_iter:
iterations += 1
# ── 调用 LLM ─────────────────────────────────────
formatted_answer = self.get_llm_response(
self.messages,
self.tools,
)
# ── 判断输出类型 ──────────────────────────────────
if isinstance(formatted_answer, AgentFinish):
# LLM 给出了最终答案,退出循环
self._show_logs("agent_final_answer", formatted_answer.output)
return formatted_answer
# 到这里说明 LLM 输出了工具调用(AgentAction)
# formatted_answer 是 AgentAction 实例
action: AgentAction = formatted_answer
# ── 记录工具调用日志 ──────────────────────────────
self._show_logs("tool_use", action)
# ── 执行工具并获取观察结果 ────────────────────────
observation = execute_tool_and_check_finality(
action=action,
tools=self.tools,
agent=self.agent,
)
# ── 把 (行动 + 观察) 附加到 messages ─────────────
# 格式:
# assistant: "Thought: ...\nAction: tool_name\nAction Input: ..."
# user(tool_result): "Observation: ..."
self._append_message(
"assistant",
f"{action.log}\nObservation: {observation}",
)
# ── 触发 step_callback(每步回调)────────────────
if self.step_callback:
self.step_callback(action, observation)
# ── 超过最大迭代次数 ──────────────────────────────────
return self._handle_max_iterations()
6.1 _handle_max_iterations():超出迭代限制的处理
def _handle_max_iterations(self) -> AgentFinish:
"""
当 Agent 达到最大迭代次数时的处理:
强制注入"请给出最终答案"的提示,让 LLM 收尾
"""
self._logger.warning(
f"Maximum iterations ({self.max_iter}) reached. "
"Forcing final answer..."
)
# 注入强制收尾的 Prompt
force_prompt = self.i18n.slice("force_final_answer_no_tools")
# 大概是:"I now must return my best answer based on what I know."
self._append_message("user", force_prompt)
# 再调用一次 LLM,这次不提供工具(强制输出文本答案)
return self.get_llm_response(self.messages, tools=[])
七、第六层:get_llm_response() LLM 推理
这一层负责把 messages 发给 LLM,并把响应解析为 AgentAction 或 AgentFinish。
# src/crewai/utilities/agent_utils.py
def get_llm_response(
messages: List[LLMMessage],
tools: List[Any],
llm: Any,
parser: "CrewAgentParser",
) -> Union[AgentAction, AgentFinish]:
"""
调用 LLM 并解析响应
两种 LLM 响应格式:
1. 原生工具调用(tool_calls):现代 LLM 的结构化工具调用
2. ReAct 文本格式:旧式文本解析("Action: xxx\nAction Input: xxx")
"""
# ── 实际调用 LLM ──────────────────────────────────────────
# CrewAI 使用 LiteLLM 作为统一 LLM 接口
# LiteLLM 会根据模型名路由到对应的 API(OpenAI/Anthropic/Gemini/...)
response = llm.call(
messages=messages,
tools=_format_tools_for_llm(tools), # 转换为 OpenAI 工具格式
)
# ── 检查是否有原生工具调用 ────────────────────────────────
if hasattr(response, "tool_calls") and response.tool_calls:
# 现代 LLM 的结构化工具调用(OpenAI function calling 格式)
tool_call = response.tool_calls[0]
return AgentAction(
tool=tool_call.function.name,
tool_input=json.loads(tool_call.function.arguments),
log=response.content or "",
tool_call_id=tool_call.id,
)
# ── 没有原生工具调用,解析文本格式 ───────────────────────
text_response = response.content if hasattr(response, "content") else str(response)
# 交给解析器处理(ReAct 文本格式解析)
return parser.parse(text_response)
7.1 LiteLLM 的作用
# src/crewai/llm/llm.py(简化)
class LLM:
"""
CrewAI 的 LLM 适配层,内部用 LiteLLM 统一不同 LLM 提供商的接口
支持的模型(通过 model 名称路由):
openai/gpt-4o → OpenAI API
anthropic/claude-3-5 → Anthropic API
gemini/gemini-pro → Google AI
azure/gpt-4 → Azure OpenAI
bedrock/claude → AWS Bedrock
ollama/llama3 → 本地 Ollama
...(100+ 模型提供商)
"""
def call(self, messages: List[dict], tools: List[dict] = None) -> Any:
"""
统一调用接口,内部委托给 litellm.completion()
"""
import litellm
kwargs = {
"model": self.model,
"messages": messages,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
# 速率限制(如果配置了 max_rpm)
if self._rpm_controller:
self._rpm_controller.check_or_wait()
try:
response = litellm.completion(**kwargs)
return response.choices[0].message
except litellm.RateLimitError:
# 速率超限时等待后重试
time.sleep(60)
return self.call(messages, tools)
except litellm.ContextWindowExceededError:
# 上下文窗口超限时的处理
if self.respect_context_window:
messages = self._trim_messages(messages)
return self.call(messages, tools)
raise
八、第七层:工具调用链路
# src/crewai/utilities/tool_utils.py
def execute_tool_and_check_finality(
action: AgentAction,
tools: List[BaseTool],
agent: "Agent",
) -> str:
"""
执行 AgentAction 中指定的工具,返回观察结果(字符串)
完整职责:
1. 从工具列表中找到对应的工具
2. 调用工具(带超时和错误处理)
3. 检查是否是"委派"类型的工具调用
4. 返回工具结果字符串
"""
tool_name = action.tool
tool_input = action.tool_input
# ── 查找工具 ──────────────────────────────────────────────
tool = _find_tool(tool_name, tools)
if tool is None:
# 工具不存在:返回错误提示(不是异常,让 Agent 自行处理)
return agent.i18n.errors("tool_usage").format(tool=tool_name)
# ── 检查工具是否被缓存 ────────────────────────────────────
if tool.cache_function and tool.cache_function(tool_input):
cached_result = _get_from_cache(tool_name, tool_input)
if cached_result is not None:
return cached_result
# ── 执行工具 ──────────────────────────────────────────────
try:
# BaseTool.run() 是工具执行的统一入口
# 内部会调用 _run() 方法(用户实现的具体逻辑)
result = tool.run(tool_input)
# ── 缓存结果 ──────────────────────────────────────────
if tool.cache_function and tool.cache_function(tool_input):
_save_to_cache(tool_name, tool_input, result)
return str(result)
except ToolException as e:
# 工具执行出错:返回错误信息给 Agent,让 Agent 决定如何处理
if tool.handle_tool_error:
return str(e)
else:
# 如果工具不处理错误,重新抛出(Agent 上层会处理)
raise
def _find_tool(name: str, tools: List[BaseTool]) -> Optional[BaseTool]:
"""
查找工具:支持精确匹配和模糊匹配
LLM 有时会把工具名大小写搞错,用模糊匹配兜底
"""
# 精确匹配
for tool in tools:
if tool.name == name:
return tool
# 不区分大小写的匹配
name_lower = name.lower().replace(" ", "_").replace("-", "_")
for tool in tools:
tool_name_lower = tool.name.lower().replace(" ", "_").replace("-", "_")
if tool_name_lower == name_lower:
return tool
return None
8.1 BaseTool.run():工具执行的统一入口
# src/crewai/tools/base_tool.py
class BaseTool(BaseModel, ABC):
"""
所有 CrewAI 工具的基类
"""
name: str
description: str
args_schema: Optional[Type[BaseModel]] = None # 输入参数的 Pydantic schema
cache_function: Optional[Callable] = None # 是否缓存(函数返回 True 则缓存)
result_as_answer: bool = False # 结果是否直接作为 Agent 的最终答案
def run(self, tool_input: Union[str, Dict]) -> Any:
"""
工具执行的统一入口(模板方法模式)
子类只需要实现 _run() 方法
"""
try:
# ── 解析输入 ──────────────────────────────────────
# tool_input 可能是 JSON 字符串或 dict
if isinstance(tool_input, str):
try:
parsed_input = json.loads(tool_input)
except json.JSONDecodeError:
# 不是 JSON,作为单个字符串参数
parsed_input = tool_input
else:
parsed_input = tool_input
# ── 如果有 args_schema,用 Pydantic 验证参数 ────────
if self.args_schema:
if isinstance(parsed_input, dict):
validated = self.args_schema(**parsed_input)
parsed_input = validated.model_dump()
# ── 调用实际工具实现 ──────────────────────────────
result = self._run(parsed_input)
# ── 发出工具执行事件 ──────────────────────────────
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
tool_name=self.name,
tool_input=tool_input,
tool_output=str(result),
),
)
return result
except ValidationError as e:
# Pydantic 验证失败:返回友好的错误提示
return f"Tool input validation failed: {e}"
except Exception as e:
if self.handle_tool_error:
return f"Tool execution error: {str(e)}"
raise ToolException(str(e)) from e
@abstractmethod
def _run(self, *args, **kwargs) -> Any:
"""子类实现具体的工具逻辑"""
pass
8.2 @tool 装饰器:快速创建工具
# src/crewai/tools/tool_decorator.py
def tool(name_or_func=None, *args, **kwargs):
"""
把普通函数包装成 CrewAI 工具的装饰器
用法:
@tool("搜索工具")
def web_search(query: str) -> str:
'''在网络上搜索给定的查询词,返回前5条结果'''
return serper_api.search(query)
"""
def decorator(func):
# 从函数的 docstring 提取工具 description
tool_description = inspect.cleandoc(func.__doc__ or "")
# 从函数签名自动生成 args_schema(Pydantic 模型)
sig = inspect.signature(func)
fields = {}
for param_name, param in sig.parameters.items():
annotation = param.annotation
if annotation == inspect.Parameter.empty:
annotation = Any
default = param.default if param.default != inspect.Parameter.empty else ...
fields[param_name] = (annotation, Field(default=default))
# 动态创建 Pydantic 模型作为 args_schema
DynamicSchema = create_model(f"{func.__name__}_schema", **fields)
# 创建工具类实例
class WrappedTool(BaseTool):
name: str = tool_name
description: str = tool_description
args_schema: Type[BaseModel] = DynamicSchema
def _run(self, **kwargs) -> Any:
return func(**kwargs)
return WrappedTool()
tool_name = name_or_func if isinstance(name_or_func, str) else func.__name__
if callable(name_or_func):
return decorator(name_or_func)
return decorator
九、输出解析:CrewAgentParser 如何解析非结构化文本
当 LLM 不支持原生工具调用(或模型输出格式不规范)时,CrewAI 需要解析 ReAct 格式的文本:
# ReAct 格式的 LLM 输出示例:
Thought: I need to search for the latest AI news.
Action: web_search
Action Input: {"query": "latest AI news 2026"}
# 或者最终答案:
Thought: I now have enough information to answer.
Final Answer: Based on my research, the latest trends in AI are...
# src/crewai/agents/parser.py
class CrewAgentParser:
"""
解析 LLM 的文本输出为 AgentAction 或 AgentFinish
"""
# ReAct 格式的正则表达式
ACTION_PATTERN = r"Action\s*:\s*(.+?)(?:\n|$)"
ACTION_INPUT_PATTERN = r"Action\s*Input\s*:\s*(.+?)(?:\nObservation|\Z)"
FINAL_ANSWER_PATTERN = r"Final\s*Answer\s*:\s*(.+)"
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
"""
解析 LLM 输出
解析优先级:
1. 如果包含 "Final Answer":返回 AgentFinish
2. 如果包含 "Action" + "Action Input":返回 AgentAction
3. 其他情况:也返回 AgentFinish(最后一次推理)
"""
# ── 检查是否有最终答案 ────────────────────────────────
final_match = re.search(self.FINAL_ANSWER_PATTERN, text, re.DOTALL | re.IGNORECASE)
if final_match:
return AgentFinish(
output=final_match.group(1).strip(),
log=text,
)
# ── 检查是否有工具调用 ────────────────────────────────
action_match = re.search(self.ACTION_PATTERN, text, re.IGNORECASE)
action_input_match = re.search(self.ACTION_INPUT_PATTERN, text, re.DOTALL | re.IGNORECASE)
if action_match:
tool_name = action_match.group(1).strip()
tool_input_str = action_input_match.group(1).strip() if action_input_match else "{}"
# 尝试解析 tool_input 为 dict
try:
tool_input = json.loads(tool_input_str)
except json.JSONDecodeError:
# JSON 解析失败,作为字符串处理
tool_input = tool_input_str
return AgentAction(
tool=tool_name,
tool_input=tool_input,
log=text,
)
# ── 无法识别格式:强制作为最终答案 ─────────────────────
# 这种情况出现在:LLM 直接给出答案,没有遵循 ReAct 格式
return AgentFinish(
output=text.strip(),
log=text,
)
def _fix_common_mistakes(self, text: str) -> str:
"""
修复 LLM 输出中的常见格式错误
"""
# 修复:Action : tool_name(冒号前有空格)
text = re.sub(r"Action\s+:", "Action:", text)
# 修复:Action Input 写成了 Input
text = re.sub(r"(?<!\w)Input\s*:", "Action Input:", text)
# 修复:Final answer(小写 a)
text = re.sub(r"Final\s+answer\s*:", "Final Answer:", text, flags=re.IGNORECASE)
return text
十、上下文传递机制:Task 之间如何共享信息
理解上下文传递对于正确使用 CrewAI 非常重要。
10.1 隐式上下文(Sequential 默认模式)
# 每个 Task 自动获得前序所有 Task 的输出
# 构建的上下文字符串大概是:
"""
Task 1 output:
(这里是第一个 Task 的完整输出文本)
Task 2 output:
(这里是第二个 Task 的完整输出文本)
"""
# 这个上下文被拼接到当前 Task 的 Prompt 后面
# 可能导致的问题:
# - 随着 Task 增多,上下文越来越长(Token 消耗急剧增加)
# - 解决:设置 task.context = [specific_task](显式依赖)
10.2 显式上下文(推荐生产用)
# 显式指定依赖关系:analysis_task 只接收 research_task 的输出
# 而不是所有前序 Task 的输出
research_task = Task(
description="研究 AI 市场趋势",
expected_output="详细的市场分析报告",
agent=researcher,
)
analysis_task = Task(
description="基于研究结果,分析投资机会",
expected_output="投资建议列表",
agent=analyst,
context=[research_task], # ← 显式指定只依赖 research_task
)
writing_task = Task(
description="将分析结果写成报告",
expected_output="完整的投资报告",
agent=writer,
context=[research_task, analysis_task], # ← 同时依赖两个 Task
)
10.3 上下文的 Token 预算管理
# Agent 的 respect_context_window=True 时(默认开启)
# CrewAI 会在 context 过长时自动截断
def _trim_context_to_fit(
self,
context: str,
task_prompt: str,
max_tokens: int,
) -> str:
"""
当 context + task_prompt 超过 context window 时,截断 context
截断策略:
1. 保留最后 N 个 Task 的输出(最新的最重要)
2. 对每个输出做摘要(通过一次额外的 LLM 调用)
"""
# 估算当前 token 数(粗略估算:1 token ≈ 4 个字符)
current_tokens = (len(context) + len(task_prompt)) // 4
if current_tokens <= max_tokens * 0.8: # 留 20% buffer
return context
# 超限:对 context 做摘要
summary_prompt = f"请简洁地总结以下内容,保留关键信息:\n{context}"
summary = self.llm.call(
[{"role": "user", "content": summary_prompt}]
)
return summary.content
十一、完整调用栈追踪 + 关键断点位置
11.1 完整调用栈(可跟踪调试的版本)
crew.kickoff(inputs={"topic": "AI 监管"})
│
├── crew._interpolate_inputs(inputs) # 变量插值
├── crew._handle_crew_planning() # [可选] 规划模式
│
└── crew._run_process()
│
└── crew._execute_tasks(tasks) # Sequential 模式
│
└── for task in tasks:
│
├── crew._get_context(task, outputs) # 聚合上下文
│
└── crew._execute_single_task(task, context)
│
├── agent.execute_task(task, context, tools)
│ │
│ ├── task.prompt() # 构建任务 Prompt
│ ├── agent.knowledge.query() # [可选] 知识库检索
│ │
│ └── agent_executor.invoke(inputs)
│ │
│ ├── _append_message("system", system_prompt)
│ ├── _append_message("user", task_prompt)
│ │
│ └── _invoke_loop() # ← ReAct 核心循环
│ │
│ ├── [iteration 1]
│ │ ├── get_llm_response(messages, tools)
│ │ │ ├── llm.call(messages) # LiteLLM → API
│ │ │ └── parser.parse(response) # AgentAction
│ │ │
│ │ ├── execute_tool_and_check_finality(action)
│ │ │ ├── _find_tool(action.tool)
│ │ │ └── tool.run(action.tool_input)
│ │ │ └── tool._run(**kwargs) # 实际执行
│ │ │
│ │ └── _append_message("assistant", action+observation)
│ │
│ ├── [iteration 2] ...
│ │
│ └── [final iteration]
│ ├── get_llm_response(messages, tools)
│ └── parser.parse(response) → AgentFinish ✓
│
└── _handle_guardrail(task, output) # [可选] 质量检查
11.2 关键调试断点(按问题类型)
# 问题1:Agent 没有按预期调用工具
# 断点位置:agent_utils.py → get_llm_response()
# 查看:messages 的内容(系统 Prompt 是否正确描述了工具)
# 查看:LLM 的原始响应(是否输出了 Action 字段)
# 问题2:工具调用失败
# 断点位置:tool_utils.py → execute_tool_and_check_finality()
# 查看:action.tool 是否与注册的工具名完全匹配
# 查看:action.tool_input 的 JSON 格式是否正确
# 问题3:Agent 陷入死循环
# 断点位置:crew_agent_executor.py → _invoke_loop() 的 while 条件
# 查看:iterations 是否在递增
# 查看:max_iter 配置值(默认 20,可能太高)
# 问题4:上下文过长导致报错
# 断点位置:crew.py → _get_context()
# 查看:返回的 context 字符串长度
# 解决:显式设置 task.context = [specific_tasks]
# 问题5:输出格式不符合预期
# 断点位置:agent.py → _build_task_output()
# 查看:raw_output 的原始内容
# 查看:_extract_json() 是否正确提取了 JSON
# 实用调试命令:
crew = Crew(agents=[...], tasks=[...], verbose=True) # 开启详细日志
# verbose=True 会打印每次 LLM 调用的输入输出
11.3 性能热点分析
通过分析调用栈,可以看出性能瓶颈在哪里:
#1 LLM API 调用(最慢,占总时间 80%+)
位置:llm.call() → litellm.completion()
延迟:500ms - 10s(取决于模型和响应长度)
优化:
- 使用更快的模型(Haiku vs Opus)
- 减少 ReAct 迭代次数(更好的工具描述)
- 缓存工具结果(tool.cache_function)
#2 工具执行(次慢,占 10-15%)
位置:tool._run()
延迟:取决于外部 API(网络搜索 100ms-5s)
优化:
- 工具结果缓存
- 并行执行多个工具调用(目前 CrewAI 串行)
#3 上下文构建(轻微,<5%)
位置:_get_context() → 字符串拼接
延迟:<10ms
优化:显式 context 依赖减少不必要的文本传递
#4 Pydantic 验证(可忽略,<1%)
位置:工具参数验证
延迟:<1ms
本篇总结
这篇把 crew.kickoff() 背后的完整链路追踪了一遍:
Layer 1 kickoff():变量插值、规划、回调
Layer 2 _run_process():策略模式分发
Layer 3 _execute_tasks():顺序遍历 + 异步支持 + 上下文传递
Layer 4 execute_task():Prompt 构建 + 知识库注入
Layer 5 _invoke_loop():ReAct 核心循环(迭代直到 AgentFinish)
Layer 6 get_llm_response():LiteLLM → LLM API → 解析响应
Layer 7 execute_tool_and_check_finality():工具查找 + 执行 + 错误处理
三个关键设计:
① 工具找不到不抛异常,返回错误字符串让 Agent 自行决策
② max_iter 超限后强制注入 "give final answer" Prompt
③ 上下文默认全量传递,生产环境建议显式指定 task.context
第三篇预告
《CrewAI 源码剖析·第三篇:记忆与知识系统——短时/长时/实体记忆怎么存读,RAG Knowledge 如何接入》
将要深入的代码:
ShortTermMemory:RAG-Mem0 实现,对话内的临时记忆LongTermMemory:SQLite + 嵌入向量,跨 kickoff 的持久记忆EntityMemory:对话中提到的实体(人物/组织/地点)自动提取UserMemory:用户画像记忆Knowledge:文件/URL 接入 + 向量检索 + 查询重写
💬 你在追踪 Agent 执行时最头疼的 Bug 是什么类型的? 欢迎评论区分享!
🙏 如果这篇帮到你,一键三连(点赞👍 + 收藏⭐ + 关注)!第三篇记忆系统即将发布!
参考资料
- CrewAI 源码 v1.14.x:https://github.com/crewAIInc/crewAI
- CrewAI Agent Executor 深度分析(DeepWiki):https://deepwiki.com/lymanzhang/crewAI/5.1-cli-commands-reference
- CrewAI Process Types 源码分析(DeepWiki):https://deepwiki.com/crewAIInc/crewAI/2.4-process-types
- LiteLLM 文档:https://docs.litellm.ai
本文基于 CrewAI v1.14.x 源码分析,代码已略作简化以突出核心逻辑。最后更新:2026-05-11
更多推荐



所有评论(0)