【langchain】核心源码create_agent解析一
本文介绍了create_agent函数的核心功能与参数说明。该函数用于创建一个代理图谱,通过循环调用工具直至满足停止条件。主要参数包括:大语言模型(model)、工具列表(tools)、系统提示词(system_prompt)等,其中模型参数必填,其他参数可选。函数返回一个编译好的StateGraph对象,用于聊天交互。工作流程是:Agent节点调用语言模型,若有工具调用则执行工具并循环处理,直到
def create_agent(
model: str | BaseChatModel,
tools: Sequence[BaseTool | Callable[..., Any] | dict[str, Any]] | None = None,
*,
system_prompt: str | SystemMessage | None = None,
middleware: Sequence[AgentMiddleware[StateT_co, ContextT]] = (),
response_format: ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None,
state_schema: type[AgentState[ResponseT]] | None = None,
context_schema: type[ContextT] | None = None,
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
interrupt_before: list[str] | None = None,
interrupt_after: list[str] | None = None,
debug: bool = False,
name: str | None = None,
cache: BaseCache[Any] | None = None,
) -> CompiledStateGraph[
AgentState[ResponseT], ContextT, _InputAgentState, _OutputAgentState[ResponseT]
]:
"""Creates an agent graph that calls tools in a loop until a stopping condition is met.
For more details on using `create_agent`,
visit the [Agents](https://docs.langchain.com/oss/python/langchain/agents) docs.
Args:
model: The language model for the agent.
Can be a string identifier (e.g., `"openai:gpt-4"`) or a direct chat model
instance (e.g., [`ChatOpenAI`][langchain_openai.ChatOpenAI] or other another
[LangChain chat model](https://docs.langchain.com/oss/python/integrations/chat)).
For a full list of supported model strings, see
[`init_chat_model`][langchain.chat_models.init_chat_model(model_provider)].
!!! tip ""
See the [Models](https://docs.langchain.com/oss/python/langchain/models)
docs for more information.
tools: A list of tools, `dict`, or `Callable`.
If `None` or an empty list, the agent will consist of a model node without a
tool calling loop.
!!! tip ""
See the [Tools](https://docs.langchain.com/oss/python/langchain/tools)
docs for more information.
system_prompt: An optional system prompt for the LLM.
Can be a `str` (which will be converted to a `SystemMessage`) or a
`SystemMessage` instance directly. The system message is added to the
beginning of the message list when calling the model.
middleware: A sequence of middleware instances to apply to the agent.
Middleware can intercept and modify agent behavior at various stages.
!!! tip ""
See the [Middleware](https://docs.langchain.com/oss/python/langchain/middleware)
docs for more information.
response_format: An optional configuration for structured responses.
Can be a `ToolStrategy`, `ProviderStrategy`, or a Pydantic model class.
If provided, the agent will handle structured output during the
conversation flow.
Raw schemas will be wrapped in an appropriate strategy based on model
capabilities.
!!! tip ""
See the [Structured output](https://docs.langchain.com/oss/python/langchain/structured-output)
docs for more information.
state_schema: An optional `TypedDict` schema that extends `AgentState`.
When provided, this schema is used instead of `AgentState` as the base
schema for merging with middleware state schemas. This allows users to
add custom state fields without needing to create custom middleware.
Generally, it's recommended to use `state_schema` extensions via middleware
to keep relevant extensions scoped to corresponding hooks / tools.
context_schema: An optional schema for runtime context.
checkpointer: An optional checkpoint saver object.
Used for persisting the state of the graph (e.g., as chat memory) for a
single thread (e.g., a single conversation).
store: An optional store object.
Used for persisting data across multiple threads (e.g., multiple
conversations / users).
interrupt_before: An optional list of node names to interrupt before.
Useful if you want to add a user confirmation or other interrupt
before taking an action.
interrupt_after: An optional list of node names to interrupt after.
Useful if you want to return directly or run additional processing
on an output.
debug: Whether to enable verbose logging for graph execution.
When enabled, prints detailed information about each node execution, state
updates, and transitions during agent runtime. Useful for debugging
middleware behavior and understanding agent execution flow.
name: An optional name for the `CompiledStateGraph`.
This name will be automatically used when adding the agent graph to
another graph as a subgraph node - particularly useful for building
multi-agent systems.
cache: An optional `BaseCache` instance to enable caching of graph execution.
Returns:
A compiled `StateGraph` that can be used for chat interactions.
Raises:
AssertionError: If duplicate middleware instances are provided.
The agent node calls the language model with the messages list (after applying
the system prompt). If the resulting [`AIMessage`][langchain.messages.AIMessage]
contains `tool_calls`, the graph will then call the tools. The tools node executes
the tools and adds the responses to the messages list as
[`ToolMessage`][langchain.messages.ToolMessage] objects. The agent node then calls
the language model again. The process repeats until no more `tool_calls` are present
in the response. The agent then returns the full list of messages.
Example:
```python
from langchain.agents import create_agent
def check_weather(location: str) -> str:
'''Return the weather forecast for the specified location.'''
return f"It's always sunny in {location}"
graph = create_agent(
model="anthropic:claude-sonnet-4-5-20250929",
tools=[check_weather],
system_prompt="You are a helpful assistant",
)
inputs = {"messages": [{"role": "user", "content": "what is the weather in sf"}]}
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
```
"""
# init chat model
if isinstance(model, str):
model = init_chat_model(model)
# Convert system_prompt to SystemMessage if needed
system_message: SystemMessage | None = None
if system_prompt is not None:
if isinstance(system_prompt, SystemMessage):
system_message = system_prompt
else:
system_message = SystemMessage(content=system_prompt)
# Handle tools being None or empty
if tools is None:
tools = []
# Convert response format and setup structured output tools
# Raw schemas are wrapped in AutoStrategy to preserve auto-detection intent.
# AutoStrategy is converted to ToolStrategy upfront to calculate tools during agent creation,
# but may be replaced with ProviderStrategy later based on model capabilities.
initial_response_format: ToolStrategy[Any] | ProviderStrategy[Any] | AutoStrategy[Any] | None
if response_format is None:
initial_response_format = None
elif isinstance(response_format, (ToolStrategy, ProviderStrategy)):
# Preserve explicitly requested strategies
initial_response_format = response_format
elif isinstance(response_format, AutoStrategy):
# AutoStrategy provided - preserve it for later auto-detection
initial_response_format = response_format
else:
# Raw schema - wrap in AutoStrategy to enable auto-detection
initial_response_format = AutoStrategy(schema=response_format)
# For AutoStrategy, convert to ToolStrategy to setup tools upfront
# (may be replaced with ProviderStrategy later based on model)
tool_strategy_for_setup: ToolStrategy[Any] | None = None
if isinstance(initial_response_format, AutoStrategy):
tool_strategy_for_setup = ToolStrategy(schema=initial_response_format.schema)
elif isinstance(initial_response_format, ToolStrategy):
tool_strategy_for_setup = initial_response_format
structured_output_tools: dict[str, OutputToolBinding[Any]] = {}
if tool_strategy_for_setup:
for response_schema in tool_strategy_for_setup.schema_specs:
structured_tool_info = OutputToolBinding.from_schema_spec(response_schema)
structured_output_tools[structured_tool_info.tool.name] = structured_tool_info
middleware_tools = [t for m in middleware for t in getattr(m, "tools", [])]
# Collect middleware with wrap_tool_call or awrap_tool_call hooks
# Include middleware with either implementation to ensure NotImplementedError is raised
# when middleware doesn't support the execution path
middleware_w_wrap_tool_call = [
m
for m in middleware
if m.__class__.wrap_tool_call is not AgentMiddleware.wrap_tool_call
or m.__class__.awrap_tool_call is not AgentMiddleware.awrap_tool_call
]
# Chain all wrap_tool_call handlers into a single composed handler
wrap_tool_call_wrapper = None
if middleware_w_wrap_tool_call:
wrappers = [
traceable(name=f"{m.name}.wrap_tool_call", process_inputs=_scrub_inputs)(
m.wrap_tool_call
)
for m in middleware_w_wrap_tool_call
]
wrap_tool_call_wrapper = _chain_tool_call_wrappers(wrappers)
# Collect middleware with awrap_tool_call or wrap_tool_call hooks
# Include middleware with either implementation to ensure NotImplementedError is raised
# when middleware doesn't support the execution path
middleware_w_awrap_tool_call = [
m
for m in middleware
if m.__class__.awrap_tool_call is not AgentMiddleware.awrap_tool_call
or m.__class__.wrap_tool_call is not AgentMiddleware.wrap_tool_call
]
# Chain all awrap_tool_call handlers into a single composed async handler
awrap_tool_call_wrapper = None
if middleware_w_awrap_tool_call:
async_wrappers = [
traceable(name=f"{m.name}.awrap_tool_call", process_inputs=_scrub_inputs)(
m.awrap_tool_call
)
for m in middleware_w_awrap_tool_call
]
awrap_tool_call_wrapper = _chain_async_tool_call_wrappers(async_wrappers)
# Setup tools
tool_node: ToolNode | None = None
# Extract built-in provider tools (dict format) and regular tools (BaseTool/callables)
built_in_tools = [t for t in tools if isinstance(t, dict)]
regular_tools = [t for t in tools if not isinstance(t, dict)]
# Tools that require client-side execution (must be in ToolNode)
available_tools = middleware_tools + regular_tools
# Create ToolNode if we have client-side tools OR if middleware defines wrap_tool_call
# (which may handle dynamically registered tools)
tool_node = (
ToolNode(
tools=available_tools,
wrap_tool_call=wrap_tool_call_wrapper,
awrap_tool_call=awrap_tool_call_wrapper,
)
if available_tools or wrap_tool_call_wrapper or awrap_tool_call_wrapper
else None
)
# Default tools for ModelRequest initialization
# Use converted BaseTool instances from ToolNode (not raw callables)
# Include built-ins and converted tools (can be changed dynamically by middleware)
# Structured tools are NOT included - they're added dynamically based on response_format
if tool_node:
default_tools = list(tool_node.tools_by_name.values()) + built_in_tools
else:
default_tools = list(built_in_tools)
# validate middleware
if len({m.name for m in middleware}) != len(middleware):
msg = "Please remove duplicate middleware instances."
raise AssertionError(msg)
middleware_w_before_agent = [
m
for m in middleware
if m.__class__.before_agent is not AgentMiddleware.before_agent
or m.__class__.abefore_agent is not AgentMiddleware.abefore_agent
]
middleware_w_before_model = [
m
for m in middleware
if m.__class__.before_model is not AgentMiddleware.before_model
or m.__class__.abefore_model is not AgentMiddleware.abefore_model
]
middleware_w_after_model = [
m
for m in middleware
if m.__class__.after_model is not AgentMiddleware.after_model
or m.__class__.aafter_model is not AgentMiddleware.aafter_model
]
middleware_w_after_agent = [
m
for m in middleware
if m.__class__.after_agent is not AgentMiddleware.after_agent
or m.__class__.aafter_agent is not AgentMiddleware.aafter_agent
]
# Collect middleware with wrap_model_call or awrap_model_call hooks
# Include middleware with either implementation to ensure NotImplementedError is raised
# when middleware doesn't support the execution path
middleware_w_wrap_model_call = [
m
for m in middleware
if m.__class__.wrap_model_call is not AgentMiddleware.wrap_model_call
or m.__class__.awrap_model_call is not AgentMiddleware.awrap_model_call
]
# Collect middleware with awrap_model_call or wrap_model_call hooks
# Include middleware with either implementation to ensure NotImplementedError is raised
# when middleware doesn't support the execution path
middleware_w_awrap_model_call = [
m
for m in middleware
if m.__class__.awrap_model_call is not AgentMiddleware.awrap_model_call
or m.__class__.wrap_model_call is not AgentMiddleware.wrap_model_call
]
# Compose wrap_model_call handlers into a single middleware stack (sync)
wrap_model_call_handler = None
if middleware_w_wrap_model_call:
sync_handlers = [
traceable(name=f"{m.name}.wrap_model_call", process_inputs=_scrub_inputs)(
m.wrap_model_call
)
for m in middleware_w_wrap_model_call
]
wrap_model_call_handler = _chain_model_call_handlers(sync_handlers)
# Compose awrap_model_call handlers into a single middleware stack (async)
awrap_model_call_handler = None
if middleware_w_awrap_model_call:
async_handlers = [
traceable(name=f"{m.name}.awrap_model_call", process_inputs=_scrub_inputs)(
m.awrap_model_call
)
for m in middleware_w_awrap_model_call
]
awrap_model_call_handler = _chain_async_model_call_handlers(async_handlers)
state_schemas: set[type] = {m.state_schema for m in middleware}
# Use provided state_schema if available, otherwise use base AgentState
base_state = state_schema if state_schema is not None else AgentState
state_schemas.add(base_state)
resolved_state_schema, input_schema, output_schema = _resolve_schemas(state_schemas)
# create graph, add nodes
graph: StateGraph[
AgentState[ResponseT], ContextT, _InputAgentState, _OutputAgentState[ResponseT]
] = StateGraph(
state_schema=resolved_state_schema,
input_schema=input_schema,
output_schema=output_schema,
context_schema=context_schema,
)
def _handle_model_output(
output: AIMessage, effective_response_format: ResponseFormat[Any] | None
) -> dict[str, Any]:
"""Handle model output including structured responses.
Args:
output: The AI message output from the model.
effective_response_format: The actual strategy used (may differ from initial
if auto-detected).
"""
# Handle structured output with provider strategy
if isinstance(effective_response_format, ProviderStrategy):
if not output.tool_calls:
provider_strategy_binding = ProviderStrategyBinding.from_schema_spec(
effective_response_format.schema_spec
)
try:
structured_response = provider_strategy_binding.parse(output)
except Exception as exc:
schema_name = getattr(
effective_response_format.schema_spec.schema, "__name__", "response_format"
)
validation_error = StructuredOutputValidationError(schema_name, exc, output)
raise validation_error from exc
else:
return {"messages": [output], "structured_response": structured_response}
return {"messages": [output]}
# Handle structured output with tool strategy
if (
isinstance(effective_response_format, ToolStrategy)
and isinstance(output, AIMessage)
and output.tool_calls
):
structured_tool_calls = [
tc for tc in output.tool_calls if tc["name"] in structured_output_tools
]
if structured_tool_calls:
exception: StructuredOutputError | None = None
if len(structured_tool_calls) > 1:
# Handle multiple structured outputs error
tool_names = [tc["name"] for tc in structured_tool_calls]
exception = MultipleStructuredOutputsError(tool_names, output)
should_retry, error_message = _handle_structured_output_error(
exception, effective_response_format
)
if not should_retry:
raise exception
# Add error messages and retry
tool_messages = [
ToolMessage(
content=error_message,
tool_call_id=tc["id"],
name=tc["name"],
)
for tc in structured_tool_calls
]
return {"messages": [output, *tool_messages]}
# Handle single structured output
tool_call = structured_tool_calls[0]
try:
structured_tool_binding = structured_output_tools[tool_call["name"]]
structured_response = structured_tool_binding.parse(tool_call["args"])
tool_message_content = (
effective_response_format.tool_message_content
or f"Returning structured response: {structured_response}"
)
return {
"messages": [
output,
ToolMessage(
content=tool_message_content,
tool_call_id=tool_call["id"],
name=tool_call["name"],
),
],
"structured_response": structured_response,
}
except Exception as exc:
exception = StructuredOutputValidationError(tool_call["name"], exc, output)
should_retry, error_message = _handle_structured_output_error(
exception, effective_response_format
)
if not should_retry:
raise exception from exc
return {
"messages": [
output,
ToolMessage(
content=error_message,
tool_call_id=tool_call["id"],
name=tool_call["name"],
),
],
}
return {"messages": [output]}
def _get_bound_model(
request: ModelRequest[ContextT],
) -> tuple[Runnable[Any, Any], ResponseFormat[Any] | None]:
"""Get the model with appropriate tool bindings.
Performs auto-detection of strategy if needed based on model capabilities.
Args:
request: The model request containing model, tools, and response format.
Returns:
Tuple of `(bound_model, effective_response_format)` where
`effective_response_format` is the actual strategy used (may differ from
initial if auto-detected).
Raises:
ValueError: If middleware returned unknown client-side tool names.
ValueError: If `ToolStrategy` specifies tools not declared upfront.
"""
# Validate ONLY client-side tools that need to exist in tool_node
# Skip validation when wrap_tool_call is defined, as middleware may handle
# dynamic tools that are added at runtime via wrap_model_call
has_wrap_tool_call = wrap_tool_call_wrapper or awrap_tool_call_wrapper
# Build map of available client-side tools from the ToolNode
# (which has already converted callables)
available_tools_by_name = {}
if tool_node:
available_tools_by_name = tool_node.tools_by_name.copy()
# Check if any requested tools are unknown CLIENT-SIDE tools
# Only validate if wrap_tool_call is NOT defined (no dynamic tool handling)
if not has_wrap_tool_call:
unknown_tool_names = []
for t in request.tools:
# Only validate BaseTool instances (skip built-in dict tools)
if isinstance(t, dict):
continue
if isinstance(t, BaseTool) and t.name not in available_tools_by_name:
unknown_tool_names.append(t.name)
if unknown_tool_names:
available_tool_names = sorted(available_tools_by_name.keys())
msg = DYNAMIC_TOOL_ERROR_TEMPLATE.format(
unknown_tool_names=unknown_tool_names,
available_tool_names=available_tool_names,
)
raise ValueError(msg)
# Normalize raw schemas to AutoStrategy
# (handles middleware override with raw Pydantic classes)
response_format: ResponseFormat[Any] | Any | None = request.response_format
if response_format is not None and not isinstance(
response_format, (AutoStrategy, ToolStrategy, ProviderStrategy)
):
response_format = AutoStrategy(schema=response_format)
# Determine effective response format (auto-detect if needed)
effective_response_format: ResponseFormat[Any] | None
if isinstance(response_format, AutoStrategy):
# User provided raw schema via AutoStrategy - auto-detect best strategy based on model
if _supports_provider_strategy(request.model, tools=request.tools):
# Model supports provider strategy - use it
effective_response_format = ProviderStrategy(schema=response_format.schema)
elif response_format is initial_response_format and tool_strategy_for_setup is not None:
# Model doesn't support provider strategy - use ToolStrategy
# Reuse the strategy from setup if possible to preserve tool names
effective_response_format = tool_strategy_for_setup
else:
effective_response_format = ToolStrategy(schema=response_format.schema)
else:
# User explicitly specified a strategy - preserve it
effective_response_format = response_format
# Build final tools list including structured output tools
# request.tools now only contains BaseTool instances (converted from callables)
# and dicts (built-ins)
final_tools = list(request.tools)
if isinstance(effective_response_format, ToolStrategy):
# Add structured output tools to final tools list
structured_tools = [info.tool for info in structured_output_tools.values()]
final_tools.extend(structured_tools)
# Bind model based on effective response format
if isinstance(effective_response_format, ProviderStrategy):
# (Backward compatibility) Use OpenAI format structured output
kwargs = effective_response_format.to_model_kwargs()
return (
request.model.bind_tools(
final_tools, strict=True, **kwargs, **request.model_settings
),
effective_response_format,
)
if isinstance(effective_response_format, ToolStrategy):
# Current implementation requires that tools used for structured output
# have to be declared upfront when creating the agent as part of the
# response format. Middleware is allowed to change the response format
# to a subset of the original structured tools when using ToolStrategy,
# but not to add new structured tools that weren't declared upfront.
# Compute output binding
for tc in effective_response_format.schema_specs:
if tc.name not in structured_output_tools:
msg = (
f"ToolStrategy specifies tool '{tc.name}' "
"which wasn't declared in the original "
"response format when creating the agent."
)
raise ValueError(msg)
# Force tool use if we have structured output tools
tool_choice = "any" if structured_output_tools else request.tool_choice
return (
request.model.bind_tools(
final_tools, tool_choice=tool_choice, **request.model_settings
),
effective_response_format,
)
# No structured output - standard model binding
if final_tools:
return (
request.model.bind_tools(
final_tools, tool_choice=request.tool_choice, **request.model_settings
),
None,
)
return request.model.bind(**request.model_settings), None
def _execute_model_sync(request: ModelRequest[ContextT]) -> ModelResponse:
"""Execute model and return response.
This is the core model execution logic wrapped by `wrap_model_call` handlers.
Raises any exceptions that occur during model invocation.
"""
# Get the bound model (with auto-detection if needed)
model_, effective_response_format = _get_bound_model(request)
messages = request.messages
if request.system_message:
messages = [request.system_message, *messages]
output = model_.invoke(messages)
if name:
output.name = name
# Handle model output to get messages and structured_response
handled_output = _handle_model_output(output, effective_response_format)
messages_list = handled_output["messages"]
structured_response = handled_output.get("structured_response")
return ModelResponse(
result=messages_list,
structured_response=structured_response,
)
def model_node(state: AgentState[Any], runtime: Runtime[ContextT]) -> list[Command[Any]]:
"""Sync model request handler with sequential middleware processing."""
request = ModelRequest(
model=model,
tools=default_tools,
system_message=system_message,
response_format=initial_response_format,
messages=state["messages"],
tool_choice=None,
state=state,
runtime=runtime,
)
if wrap_model_call_handler is None:
model_response = _execute_model_sync(request)
return _build_commands(model_response)
result = wrap_model_call_handler(request, _execute_model_sync)
return _build_commands(result.model_response, result.commands)
async def _execute_model_async(request: ModelRequest[ContextT]) -> ModelResponse:
"""Execute model asynchronously and return response.
This is the core async model execution logic wrapped by `wrap_model_call`
handlers.
Raises any exceptions that occur during model invocation.
"""
# Get the bound model (with auto-detection if needed)
model_, effective_response_format = _get_bound_model(request)
messages = request.messages
if request.system_message:
messages = [request.system_message, *messages]
output = await model_.ainvoke(messages)
if name:
output.name = name
# Handle model output to get messages and structured_response
handled_output = _handle_model_output(output, effective_response_format)
messages_list = handled_output["messages"]
structured_response = handled_output.get("structured_response")
return ModelResponse(
result=messages_list,
structured_response=structured_response,
)
async def amodel_node(state: AgentState[Any], runtime: Runtime[ContextT]) -> list[Command[Any]]:
"""Async model request handler with sequential middleware processing."""
request = ModelRequest(
model=model,
tools=default_tools,
system_message=system_message,
response_format=initial_response_format,
messages=state["messages"],
tool_choice=None,
state=state,
runtime=runtime,
)
if awrap_model_call_handler is None:
model_response = await _execute_model_async(request)
return _build_commands(model_response)
result = await awrap_model_call_handler(request, _execute_model_async)
return _build_commands(result.model_response, result.commands)
# Use sync or async based on model capabilities
graph.add_node("model", RunnableCallable(model_node, amodel_node, trace=False))
# Only add tools node if we have tools
if tool_node is not None:
graph.add_node("tools", tool_node)
# Add middleware nodes
for m in middleware:
if (
m.__class__.before_agent is not AgentMiddleware.before_agent
or m.__class__.abefore_agent is not AgentMiddleware.abefore_agent
):
# Use RunnableCallable to support both sync and async
# Pass None for sync if not overridden to avoid signature conflicts
sync_before_agent = (
m.before_agent
if m.__class__.before_agent is not AgentMiddleware.before_agent
else None
)
async_before_agent = (
m.abefore_agent
if m.__class__.abefore_agent is not AgentMiddleware.abefore_agent
else None
)
before_agent_node = RunnableCallable(sync_before_agent, async_before_agent, trace=False)
graph.add_node(
f"{m.name}.before_agent", before_agent_node, input_schema=resolved_state_schema
)
if (
m.__class__.before_model is not AgentMiddleware.before_model
or m.__class__.abefore_model is not AgentMiddleware.abefore_model
):
# Use RunnableCallable to support both sync and async
# Pass None for sync if not overridden to avoid signature conflicts
sync_before = (
m.before_model
if m.__class__.before_model is not AgentMiddleware.before_model
else None
)
async_before = (
m.abefore_model
if m.__class__.abefore_model is not AgentMiddleware.abefore_model
else None
)
before_node = RunnableCallable(sync_before, async_before, trace=False)
graph.add_node(
f"{m.name}.before_model", before_node, input_schema=resolved_state_schema
)
if (
m.__class__.after_model is not AgentMiddleware.after_model
or m.__class__.aafter_model is not AgentMiddleware.aafter_model
):
# Use RunnableCallable to support both sync and async
# Pass None for sync if not overridden to avoid signature conflicts
sync_after = (
m.after_model
if m.__class__.after_model is not AgentMiddleware.after_model
else None
)
async_after = (
m.aafter_model
if m.__class__.aafter_model is not AgentMiddleware.aafter_model
else None
)
after_node = RunnableCallable(sync_after, async_after, trace=False)
graph.add_node(f"{m.name}.after_model", after_node, input_schema=resolved_state_schema)
if (
m.__class__.after_agent is not AgentMiddleware.after_agent
or m.__class__.aafter_agent is not AgentMiddleware.aafter_agent
):
# Use RunnableCallable to support both sync and async
# Pass None for sync if not overridden to avoid signature conflicts
sync_after_agent = (
m.after_agent
if m.__class__.after_agent is not AgentMiddleware.after_agent
else None
)
async_after_agent = (
m.aafter_agent
if m.__class__.aafter_agent is not AgentMiddleware.aafter_agent
else None
)
after_agent_node = RunnableCallable(sync_after_agent, async_after_agent, trace=False)
graph.add_node(
f"{m.name}.after_agent", after_agent_node, input_schema=resolved_state_schema
)
# Determine the entry node (runs once at start): before_agent -> before_model -> model
if middleware_w_before_agent:
entry_node = f"{middleware_w_before_agent[0].name}.before_agent"
elif middleware_w_before_model:
entry_node = f"{middleware_w_before_model[0].name}.before_model"
else:
entry_node = "model"
# Determine the loop entry node (beginning of agent loop, excludes before_agent)
# This is where tools will loop back to for the next iteration
if middleware_w_before_model:
loop_entry_node = f"{middleware_w_before_model[0].name}.before_model"
else:
loop_entry_node = "model"
# Determine the loop exit node (end of each iteration, can run multiple times)
# This is after_model or model, but NOT after_agent
if middleware_w_after_model:
loop_exit_node = f"{middleware_w_after_model[0].name}.after_model"
else:
loop_exit_node = "model"
# Determine the exit node (runs once at end): after_agent or END
if middleware_w_after_agent:
exit_node = f"{middleware_w_after_agent[-1].name}.after_agent"
else:
exit_node = END
graph.add_edge(START, entry_node)
# add conditional edges only if tools exist
if tool_node is not None:
# Only include exit_node in destinations if any tool has return_direct=True
# or if there are structured output tools
tools_to_model_destinations = [loop_entry_node]
if (
any(tool.return_direct for tool in tool_node.tools_by_name.values())
or structured_output_tools
):
tools_to_model_destinations.append(exit_node)
graph.add_conditional_edges(
"tools",
RunnableCallable(
_make_tools_to_model_edge(
tool_node=tool_node,
model_destination=loop_entry_node,
structured_output_tools=structured_output_tools,
end_destination=exit_node,
),
trace=False,
),
tools_to_model_destinations,
)
# base destinations are tools and exit_node
# we add the loop_entry node to edge destinations if:
# - there is an after model hook(s) -- allows jump_to to model
# potentially artificially injected tool messages, ex HITL
# - there is a response format -- to allow for jumping to model to handle
# regenerating structured output tool calls
model_to_tools_destinations = ["tools", exit_node]
if response_format or loop_exit_node != "model":
model_to_tools_destinations.append(loop_entry_node)
graph.add_conditional_edges(
loop_exit_node,
RunnableCallable(
_make_model_to_tools_edge(
model_destination=loop_entry_node,
structured_output_tools=structured_output_tools,
end_destination=exit_node,
),
trace=False,
),
model_to_tools_destinations,
)
elif len(structured_output_tools) > 0:
graph.add_conditional_edges(
loop_exit_node,
RunnableCallable(
_make_model_to_model_edge(
model_destination=loop_entry_node,
end_destination=exit_node,
),
trace=False,
),
[loop_entry_node, exit_node],
)
elif loop_exit_node == "model":
# If no tools and no after_model, go directly to exit_node
graph.add_edge(loop_exit_node, exit_node)
# No tools but we have after_model - connect after_model to exit_node
else:
_add_middleware_edge(
graph,
name=f"{middleware_w_after_model[0].name}.after_model",
default_destination=exit_node,
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(middleware_w_after_model[0], "after_model"),
)
# Add before_agent middleware edges
if middleware_w_before_agent:
for m1, m2 in itertools.pairwise(middleware_w_before_agent):
_add_middleware_edge(
graph,
name=f"{m1.name}.before_agent",
default_destination=f"{m2.name}.before_agent",
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(m1, "before_agent"),
)
# Connect last before_agent to loop_entry_node (before_model or model)
_add_middleware_edge(
graph,
name=f"{middleware_w_before_agent[-1].name}.before_agent",
default_destination=loop_entry_node,
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(middleware_w_before_agent[-1], "before_agent"),
)
# Add before_model middleware edges
if middleware_w_before_model:
for m1, m2 in itertools.pairwise(middleware_w_before_model):
_add_middleware_edge(
graph,
name=f"{m1.name}.before_model",
default_destination=f"{m2.name}.before_model",
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(m1, "before_model"),
)
# Go directly to model after the last before_model
_add_middleware_edge(
graph,
name=f"{middleware_w_before_model[-1].name}.before_model",
default_destination="model",
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(middleware_w_before_model[-1], "before_model"),
)
# Add after_model middleware edges
if middleware_w_after_model:
graph.add_edge("model", f"{middleware_w_after_model[-1].name}.after_model")
for idx in range(len(middleware_w_after_model) - 1, 0, -1):
m1 = middleware_w_after_model[idx]
m2 = middleware_w_after_model[idx - 1]
_add_middleware_edge(
graph,
name=f"{m1.name}.after_model",
default_destination=f"{m2.name}.after_model",
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(m1, "after_model"),
)
# Note: Connection from after_model to after_agent/END is handled above
# in the conditional edges section
# Add after_agent middleware edges
if middleware_w_after_agent:
# Chain after_agent middleware (runs once at the very end, before END)
for idx in range(len(middleware_w_after_agent) - 1, 0, -1):
m1 = middleware_w_after_agent[idx]
m2 = middleware_w_after_agent[idx - 1]
_add_middleware_edge(
graph,
name=f"{m1.name}.after_agent",
default_destination=f"{m2.name}.after_agent",
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(m1, "after_agent"),
)
# Connect the last after_agent to END
_add_middleware_edge(
graph,
name=f"{middleware_w_after_agent[0].name}.after_agent",
default_destination=END,
model_destination=loop_entry_node,
end_destination=exit_node,
can_jump_to=_get_can_jump_to(middleware_w_after_agent[0], "after_agent"),
)
# Set recursion limit to 9_999
# https://github.com/langchain-ai/langgraph/issues/7313
config: RunnableConfig = {"recursion_limit": 9_999}
config["metadata"] = {"ls_integration": "langchain_create_agent"}
if name:
config["metadata"]["lc_agent_name"] = name
return graph.compile(
checkpointer=checkpointer,
store=store,
interrupt_before=interrupt_before,
interrupt_after=interrupt_after,
debug=debug,
name=name,
cache=cache,
).with_config(config)
以下是 create_agent 函数的参数说明,内容严格基于代码中的 Docstring 进行翻译整理:
其中的核心理论是:
创建一个代理图谱,循环调用工具直至满足停止条件。
更详细的信息可查看链接:https://docs.langchain.com/oss/python/langchain/agents
create_agent 参数对照表
参数 |
参数名称 |
是否必填 |
类型 |
参数解释 |
详细描述 |
|---|---|---|---|---|---|
|
model |
大语言模型 |
必填 |
str | BaseChatModel |
model: The language model for the agent. |
可以是字符串标识符(例如 "openai:gpt-4")或直接传入聊天模型实例(例如:ChatOpenAI),有关支持的模型字符串完整列表,请参阅init_chat_model |
|
tools |
工具列表 |
可选 |
Sequence[BaseTool | Callable[..., Any] | dict[str, Any]] | None = None |
tools: A list of tools, `dict`, or `Callable`. |
如果为 None 或空列表,Agent 将仅包含一个没有工具调用循环的模型节点。 注意:更多信息,请参阅Tools |
|
system_prompt |
系统提示词 |
可选 |
str | SystemMessage | None = None |
system_prompt: An optional system prompt for the LLM. |
可以是 'str'(将转换为 SystemMessage)或直接传入 SystemMessage 实例。系统消息会在调用模型时添加到消息列表的开头。 |
|
middleware |
中间件 |
可选 |
Sequence[AgentMiddleware[StateT_co, ContextT]] = () |
middleware: A sequence of middleware instances to apply to the agent. |
中间件可以在各个阶段拦截并修改 Agent 的行为。更多信息,请参阅 |
|
response_format |
输出结构化 |
可选 |
ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None |
response_format: An optional configuration for structured responses. |
可以是 ToolStrategy、ProviderStrategy 或 Pydantic 模型类。 |
|
state_schema |
状态定义 |
可选 |
type[AgentState[ResponseT]] | None = None |
state_schema: An optional `TypedDict` schema that extends `AgentState`. |
如果提供,此 schema 将代替 AgentState 作为与中间件状态 schema 合并的基础 schema。这允许用户添加自定义状态字段,而无需创建自定义中间件。 |
|
context_schema |
运行时上下文定义 |
可选 |
type[ContextT] | None = None |
context_schema: An optional schema for runtime context. |
运行时上下文的可选 schema。 |
|
checkpointer |
检查点保存器对象 |
可选 |
Checkpointer | None = None, |
checkpointer: An optional checkpoint saver object. |
用于持久化保存单个线程(例如,一次对话)中图(graph)的状态(例如,作为聊天记忆),通俗的说就是:用于在独立对话中自动保存和恢复上下文状态 |
|
store |
存储对象 |
可选 |
BaseStore | None = None |
store: An optional store object. |
用于跨多个线程(例如:多次对话/用户)持久化数据。 |
|
interrupt_before |
中断节点名称列表 |
可选 |
list[str] | None = None |
interrupt_before: An optional list of node names to interrupt before. |
如果你想在执行操作前添加用户确认或其他中断,这会很有用。 |
|
interrupt_after |
中断节点名称列表 |
可选 |
list[str] | None = None |
interrupt_after: An optional list of node names to interrupt after. |
如果你想要直接返回或对输出运行额外处理,这会很有用。 |
|
debug |
是否启用调试功能 | 可选 |
bool = False |
debug: Whether to enable verbose logging for graph execution. |
启用时,会打印关于每个节点执行、状态更新以及 Agent 运行时转换的详细信息。对于调试中间件行为和理解 Agent 执行流很有用。 |
|
name |
CompiledStateGraph 的可选名称 | 可选 |
str | None = None |
name: An optional name for the `CompiledStateGraph`. |
当将 Agent 图作为子图节点添加到另一个图中时,此名称将被自动使用——这对于构建多 Agent 系统特别有用。 |
|
cache |
是否启用缓存 | 可选 |
BaseCache[Any] | None = None |
cache: An optional `BaseCache` instance to enable caching of graph execution. |
可选的 BaseCache 实例,用于启用图执行的 |
以上参数是create_agent的具体参数解析,但是不要忘了,还有一个 * 号。请参考下图* 号在这里表示的意思是:
-
-
*之前的参数:可以按位置传递,也可以按关键字传递 -
*之后的参数:必须使用参数名=值的形式传递,不能只按位置传递
-
-
举例说明:
-
# 位置参数可以按顺序传 agent = create_agent( "gpt-4", # 位置:model [tool1, tool2], # 位置:tools system_prompt="You are helpful", # 关键字:system_prompt state_schema=MyState, # 关键字:state_schema checkpointer=my_checkpointer # 关键字:checkpointer )
-
对于create_agent的返回参数说明:
返回参数 |
参数名称 |
类型 |
参数解释 |
详细描述 |
|
Returns |
返回参数 |
CompiledStateGraph[ |
A compiled `StateGraph` that can be used for chat interactions. |
一个编译好的 StateGraph,可用于聊天交互。 |
|
Raises |
抛出异常 |
AssertionError |
AssertionError: If duplicate middleware instances are provided. |
抛出异常 |
Agent的工作流程:
- Agent 节点会(在应用系统提示词之后)使用消息列表调用语言模型。
- 如果生成的
AIMessage包含了tool_calls,那么该图将会接着调用这些工具。 - 工具节点负责执行这些工具,并将工具的执行结果以
ToolMessage对象的形式添加到消息列表中。 - 随后,Agent 节点会再次调用语言模型。
- 这个过程会一直重复,直到模型的响应中不再包含任何
tool_calls。 - 届时,Agent 将返回完整的消息列表作为最终结果。
Agent 会反复执行“思考 → 调用工具 → 获取结果 → 再思考”的循环,直到模型认为不需要再调用任何工具时,返回全部对话记录。其中心思想和源码注释中的第一个意思一样:
Creates an agent graph that calls tools in a loop until a stopping condition is met.
流程图帮助理解
开始
│
▼
Agent节点:LLM + 消息列表
│
▼
是否有 tool_calls?
│
├── 有 ──▶ 工具节点:执行工具,生成 ToolMessage
│ │
│ ▼
│ 追加到消息列表
│ │
│ └──▶ 回到 Agent 节点(循环)
│
└── 没有 ──▶ 返回完整消息列表(结束)
更多推荐


所有评论(0)