Claude Agent SDK:编程控制你的AI助手

核心观点:如果Claude Code是交互式的AI编程助手,那么Agent SDK就是可编程的AI系统。通过Agent SDK,你可以用代码定义Agent的行为、权限、工具集,将Claude集成到你的自动化系统中。这意味着从"我在CLI中与Claude交互"升级到"Claude作为系统的一部分自动执行任务"。Agent SDK是构建AI驱动的自动化系统的基础。

关键词:Agent SDK、编程接口、系统集成、自动化、Python/TypeScript、工具定义、权限管理、会话管理


导读

你将学到:

  • 什么是Agent SDK以及为什么它很重要
  • Claude Code CLI vs Agent SDK的区别
  • Python SDK和TypeScript SDK的对比
  • 基础用法:初始化Agent、执行任务、管理响应
  • 工具定义和权限管理
  • Subagent的编程定义
  • MCP集成
  • 会话恢复和状态持久化
  • 完整的实战案例:自动化代码修复工具
  • 错误处理和日志记录
  • 性能优化和成本管理

适合人群:需要构建AI驱动自动化系统的工程师、DevOps工程师、SRE

阅读时间:35分钟 | 难度:高级 | 实用度:5/5

前置知识

  • 已阅读本系列前12篇文章
  • 熟悉Python或TypeScript
  • 了解REST API和JSON
  • 有过系统集成的经验

问题场景

你的公司想要构建一个自动化代码修复系统

需求:
1. 监控代码库的错误
2. 自动分析错误原因
3. 生成修复方案
4. 创建Pull Request
5. 全程自动化,无人工干预

问题:
- Claude Code CLI是交互式的,不适合自动化
- 需要一个可编程的API来集成Claude
- 需要定义清晰的权限和工具访问
- 需要管理Agent的生命周期和状态

为什么这很重要?

自动化系统有效性 = AI能力 × 系统集成度 × 可靠性

只有Claude Code(手动交互):
= 0.9 × 0.3 × 0.6 = 16.2%
(需要人工触发,不可靠)

有Agent SDK的系统:
= 0.95 × 0.95 × 0.95 = 85.7%
(完全自动化,可靠度高)

效率提升:5倍

核心概念

Claude Code CLI vs Agent SDK

Agent SDK
系统集成

程序代码

定义Agent

执行任务

返回结果

继续处理

Claude Code CLI
交互式开发

用户输入

Claude处理

显示结果

方面 Claude Code CLI Agent SDK
触发方式 人工在终端中执行 编程方式调用
交互模式 交互式对话 单向任务执行
工具访问 用户在运行时配置 代码中定义
错误处理 需要人工判断 程序化处理
集成能力 低(需要shell脚本) 高(原生编程)
适用场景 开发、探索 生产、自动化
学习曲线 平缓(直观) 中等(需要编码)
可扩展性

Agent SDK的架构

MCP and Tools Layer

工具执行

外部系统集成

Claude API Layer

模型推理

Token管理

Agent SDK Layer

定义系统提示/配置工具/创建Subagent

消息处理/工具调用/结果管理

Agent初始化和管理

任务执行引擎

你的应用程序
Python/TypeScript代码

SDK

API

Tools


Python SDK vs TypeScript SDK

安装

# Python SDK
pip install anthropic

# TypeScript SDK
npm install @anthropic-ai/sdk

基础初始化对比

Python版本

from anthropic import Anthropic

client = Anthropic(
    api_key="your-api-key-here"
)

    # 创建Agent
    agent = client.agents.create(
        model="claude-opus-5-0-20260101", # 2026年最新旗舰
        name="code-reviewer",
        memory_enabled=True, # 启用长期记忆
        instructions="You are a senior code reviewer..."
    )

TypeScript版本

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// 创建Agent
const agent = await client.agents.create({
  model: "claude-opus-4-5-20251101",
  name: "code-reviewer",
  instructions: "You are a senior code reviewer...",
});

功能对比

特性 Python TypeScript
文档 完整 完整
社区支持 强(数据科学社区) 强(Web社区)
异步支持 中等 完美
类型安全 类型提示 完整类型
生态系统 数据分析工具丰富 Web框架多
学习曲线 平缓 中等

推荐

  • 数据分析、ML系统 → Python
  • Web应用、实时系统 → TypeScript
  • 通用自动化系统 → Python(脚本友好)

Agent SDK基础用法

1. 创建和配置Agent

from anthropic import Anthropic

class CodeReviewAgent:
    """代码审查Agent"""

    def __init__(self):
        self.client = Anthropic(
            api_key="your-api-key"
        )

    def create_agent(self):
        """创建代码审查Agent"""

        agent = self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="code-review-specialist",
            instructions="""
你是一位资深代码审查专家,有15年的大型系统代码审查经验。

你的职责:
1. 评估代码质量和可维护性
2. 识别潜在的bug和边界情况
3. 提出设计改进建议
4. 确保遵守编码标准

你的检查清单:
- 变量命名是否清晰
- 函数复杂度是否过高
- 是否有重复代码
- 错误处理是否完整
- 是否遵循SOLID原则

你必须提供结构化的反馈,包括:
1. 总体评分(满分10分)
2. 主要问题(≤5个)
3. 改进建议(≤10个)
""",
            tools=[
                {
                    "name": "analyze_code_quality",
                    "description": "分析代码质量指标",
                    "input_schema": {
                        "type": "object",
                        "properties": {
                            "code": {
                                "type": "string",
                                "description": "要分析的代码"
                            }
                        },
                        "required": ["code"]
                    }
                }
            ]
        )

        return agent

    def execute_review(self, agent_id, code):
        """执行代码审查"""

        # 创建线程(会话)
        thread = self.client.agents.threads.create()

        # 发送消息
        message = self.client.agents.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=f"请审查这个代码:\n\n{code}"
        )

        # 执行Agent
        run = self.client.agents.threads.runs.create(
            thread_id=thread.id,
            agent_id=agent_id
        )

        # 等待完成
        while run.status == "in_progress":
            run = self.client.agents.threads.runs.retrieve(
                thread_id=thread.id,
                run_id=run.id
            )
            time.sleep(0.5)

        # 获取响应
        messages = self.client.agents.threads.messages.list(
            thread_id=thread.id
        )

        return messages.data[-1].content[0].text

2. 工具定义和调用

class ToolExecutor:
    """工具执行器"""

    def __init__(self, client):
        self.client = client

    def execute_tool(self, tool_name, tool_input):
        """执行工具"""

        if tool_name == "analyze_code_quality":
            return self._analyze_code_quality(tool_input)
        elif tool_name == "check_security":
            return self._check_security(tool_input)
        else:
            return {"error": f"Unknown tool: {tool_name}"}

    def _analyze_code_quality(self, tool_input):
        """分析代码质量"""

        code = tool_input.get("code", "")

        # 实施实际的分析逻辑
        # 这里应该调用真实的代码质量检查工具
        metrics = {
            "cyclomatic_complexity": 5,
            "lines_of_code": 45,
            "comment_ratio": 0.15,
            "has_tests": True
        }

        return metrics

    def _check_security(self, tool_input):
        """检查安全性"""

        code = tool_input.get("code", "")

        # 进行安全检查
        issues = []

        if "eval(" in code:
            issues.append("使用了eval(),存在安全风险")
        if "pickle.loads" in code:
            issues.append("使用了pickle.loads(),存在反序列化风险")

        return {"security_issues": issues}

3. 完整的任务执行流程

class AutomatedWorkflow:
    """自动化工作流"""

    def __init__(self):
        self.client = Anthropic()
        self.tool_executor = ToolExecutor(self.client)

    async def run_workflow(self, agent_id, task):
        """运行完整的工作流"""

        # 步骤1:创建线程
        thread = self.client.agents.threads.create()

        # 步骤2:发送初始消息
        self.client.agents.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=task
        )

        # 步骤3:执行Agent
        run = self.client.agents.threads.runs.create(
            thread_id=thread.id,
            agent_id=agent_id
        )

        # 步骤4:处理工具调用
        while run.status != "completed":
            if run.status == "requires_action":
                # Agent需要调用工具
                tool_calls = run.required_action.submit_tool_results

                # 执行每个工具调用
                tool_results = []
                for tool_call in tool_calls:
                    result = self.tool_executor.execute_tool(
                        tool_call.function.name,
                        tool_call.function.arguments
                    )

                    tool_results.append({
                        "tool_use_id": tool_call.id,
                        "content": result
                    })

                # 提交工具结果
                run = self.client.agents.threads.runs.submit_tool_results(
                    thread_id=thread.id,
                    run_id=run.id,
                    tool_results=tool_results
                )

            elif run.status == "failed":
                raise Exception(f"Agent运行失败:{run.last_error}")
            else:
                # 等待
                await asyncio.sleep(0.5)
                run = self.client.agents.threads.runs.retrieve(
                    thread_id=thread.id,
                    run_id=run.id
                )

        # 步骤5:获取最终响应
        messages = self.client.agents.threads.messages.list(
            thread_id=thread.id
        )

        return messages.data[-1].content[0].text

Subagent的编程定义

创建专业化Subagent

class SubagentManager:
    """Subagent管理器"""

    def __init__(self):
        self.client = Anthropic()

    def create_code_review_subagent(self):
        """创建代码审查Subagent"""

        return self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="code-review-specialist",
            instructions="你是代码审查专家...",
            tools=self._get_code_review_tools()
        )

    def create_security_subagent(self):
        """创建安全审查Subagent"""

        return self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="security-specialist",
            instructions="你是安全专家...",
            tools=self._get_security_tools()
        )

    def create_performance_subagent(self):
        """创建性能优化Subagent"""

        return self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="performance-specialist",
            instructions="你是性能优化专家...",
            tools=self._get_performance_tools()
        )

    def _get_code_review_tools(self):
        """获取代码审查工具"""

        return [
            {
                "name": "analyze_complexity",
                "description": "分析代码复杂度",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "code": {"type": "string"}
                    },
                    "required": ["code"]
                }
            }
        ]

    def _get_security_tools(self):
        """获取安全检查工具"""

        return [
            {
                "name": "scan_vulnerabilities",
                "description": "扫描安全漏洞",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "code": {"type": "string"}
                    },
                    "required": ["code"]
                }
            }
        ]

    def _get_performance_tools(self):
        """获取性能分析工具"""

        return [
            {
                "name": "analyze_performance",
                "description": "分析性能瓶颈",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "code": {"type": "string"}
                    },
                    "required": ["code"]
                }
            }
        ]

并行执行多个Subagent

import asyncio

class ParallelSubagentExecutor:
    """并行执行Subagent"""

    def __init__(self):
        self.client = Anthropic()
        self.subagent_manager = SubagentManager()

    async def execute_parallel_review(self, code):
        """并行执行多个审查Agent"""

        # 创建所有Agent
        code_review_agent = self.subagent_manager.create_code_review_subagent()
        security_agent = self.subagent_manager.create_security_subagent()
        performance_agent = self.subagent_manager.create_performance_subagent()

        # 并行执行
        tasks = [
            self._run_agent(code_review_agent.id, code, "代码审查"),
            self._run_agent(security_agent.id, code, "安全审查"),
            self._run_agent(performance_agent.id, code, "性能分析")
        ]

        results = await asyncio.gather(*tasks)

        # 聚合结果
        return {
            "code_review": results[0],
            "security": results[1],
            "performance": results[2]
        }

    async def _run_agent(self, agent_id, code, task_name):
        """运行单个Agent"""

        thread = self.client.agents.threads.create()

        self.client.agents.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=f"{task_name}:\n\n{code}"
        )

        run = self.client.agents.threads.runs.create(
            thread_id=thread.id,
            agent_id=agent_id
        )

        # 等待完成
        while run.status in ["in_progress", "requires_action"]:
            await asyncio.sleep(0.5)
            run = self.client.agents.threads.runs.retrieve(
                thread_id=thread.id,
                run_id=run.id
            )

        messages = self.client.agents.threads.messages.list(
            thread_id=thread.id
        )

        return messages.data[-1].content[0].text

MCP集成

在Agent中配置MCP

class AgentWithMCP:
    """集成MCP的Agent"""

    def __init__(self):
        self.client = Anthropic()

    def create_github_aware_agent(self):
        """创建可以访问GitHub的Agent"""

        agent = self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="github-pr-reviewer",
            instructions="你是GitHub PR审查专家...",
            tools=[
                {
                    "name": "fetch_pr",
                    "description": "从GitHub获取PR信息",
                    "input_schema": {
                        "type": "object",
                        "properties": {
                            "owner": {"type": "string"},
                            "repo": {"type": "string"},
                            "pr_number": {"type": "integer"}
                        },
                        "required": ["owner", "repo", "pr_number"]
                    }
                }
            ],
            # 配置MCP
            mcp_servers={
                "github": {
                    "command": "node",
                    "args": ["github-mcp/index.js"],
                    "env": {
                        "GITHUB_TOKEN": "${GITHUB_TOKEN}"
                    }
                }
            }
        )

        return agent

    def execute_pr_review(self, agent_id, owner, repo, pr_number):
        """执行PR审查"""

        thread = self.client.agents.threads.create()

        self.client.agents.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=f"请审查 {owner}/{repo}#{pr_number} 的PR"
        )

        run = self.client.agents.threads.runs.create(
            thread_id=thread.id,
            agent_id=agent_id
        )

        # 处理MCP调用
        while run.status != "completed":
            if run.status == "requires_action":
                # 处理工具调用(包括MCP调用)
                pass
            else:
                time.sleep(0.5)
                run = self.client.agents.threads.runs.retrieve(
                    thread_id=thread.id,
                    run_id=run.id
                )

        messages = self.client.agents.threads.messages.list(
            thread_id=thread.id
        )

        return messages.data[-1].content[0].text

会话恢复和状态管理

线程持久化

import json
from datetime import datetime

class SessionManager:
    """会话管理器"""

    def __init__(self, storage_path="sessions.json"):
        self.client = Anthropic()
        self.storage_path = storage_path
        self.sessions = self._load_sessions()

    def _load_sessions(self):
        """从存储加载会话"""

        try:
            with open(self.storage_path, "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    def _save_sessions(self):
        """保存会话到存储"""

        with open(self.storage_path, "w") as f:
            json.dump(self.sessions, f, indent=2)

    def create_session(self, session_id, agent_id, metadata=None):
        """创建新会话"""

        thread = self.client.agents.threads.create()

        self.sessions[session_id] = {
            "thread_id": thread.id,
            "agent_id": agent_id,
            "created_at": datetime.now().isoformat(),
            "metadata": metadata or {},
            "message_count": 0
        }

        self._save_sessions()
        return thread.id

    def get_session(self, session_id):
        """获取现有会话"""

        if session_id in self.sessions:
            return self.sessions[session_id]
        return None

    def continue_session(self, session_id, message):
        """继续现有会话"""

        session = self.get_session(session_id)
        if not session:
            raise ValueError(f"会话不存在:{session_id}")

        thread_id = session["thread_id"]
        agent_id = session["agent_id"]

        # 发送消息
        self.client.agents.threads.messages.create(
            thread_id=thread_id,
            role="user",
            content=message
        )

        # 执行Agent
        run = self.client.agents.threads.runs.create(
            thread_id=thread_id,
            agent_id=agent_id
        )

        # 等待完成
        while run.status == "in_progress":
            time.sleep(0.5)
            run = self.client.agents.threads.runs.retrieve(
                thread_id=thread_id,
                run_id=run.id
            )

        # 更新会话统计
        session["message_count"] += 1
        self._save_sessions()

        # 获取响应
        messages = self.client.agents.threads.messages.list(
            thread_id=thread_id
        )

        return messages.data[-1].content[0].text

    def get_session_history(self, session_id):
        """获取会话历史"""

        session = self.get_session(session_id)
        if not session:
            raise ValueError(f"会话不存在:{session_id}")

        thread_id = session["thread_id"]
        messages = self.client.agents.threads.messages.list(
            thread_id=thread_id
        )

        return messages.data

实战案例:自动化代码修复工具

完整系统设计

class AutomatedCodeFixSystem:
    """自动化代码修复系统"""

    def __init__(self):
        self.client = Anthropic()
        self.session_manager = SessionManager()

    def create_fix_agents(self):
        """创建修复所需的Agent"""

        # 分析Agent
        analyzer = self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="bug-analyzer",
            instructions="你是错误分析专家..."
        )

        # 修复Agent
        fixer = self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="code-fixer",
            instructions="你是代码修复专家..."
        )

        # 验证Agent
        verifier = self.client.agents.create(
            model="claude-opus-4-5-20251101",
            name="fix-verifier",
            instructions="你是修复验证专家..."
        )

        return {
            "analyzer": analyzer,
            "fixer": fixer,
            "verifier": verifier
        }

    async def fix_error(self, error_report, code):
        """修复单个错误"""

        agents = self.create_fix_agents()

        # 步骤1:分析错误
        print("步骤1:分析错误...")
        analysis = await self._run_agent(
            agents["analyzer"].id,
            f"分析这个错误:\n{error_report}\n\n代码:\n{code}"
        )

        # 步骤2:生成修复
        print("步骤2:生成修复方案...")
        fix = await self._run_agent(
            agents["fixer"].id,
            f"基于这个分析生成修复方案:\n{analysis}"
        )

        # 步骤3:验证修复
        print("步骤3:验证修复...")
        verification = await self._run_agent(
            agents["verifier"].id,
            f"验证这个修复方案的正确性和安全性:\n{fix}"
        )

        return {
            "analysis": analysis,
            "fix": fix,
            "verification": verification
        }

    async def _run_agent(self, agent_id, message):
        """运行单个Agent"""

        thread = self.client.agents.threads.create()

        self.client.agents.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=message
        )

        run = self.client.agents.threads.runs.create(
            thread_id=thread.id,
            agent_id=agent_id
        )

        while run.status == "in_progress":
            await asyncio.sleep(0.5)
            run = self.client.agents.threads.runs.retrieve(
                thread_id=thread.id,
                run_id=run.id
            )

        messages = self.client.agents.threads.messages.list(
            thread_id=thread.id
        )

        return messages.data[-1].content[0].text

    async def process_error_queue(self, errors):
        """批量处理错误队列"""

        results = []
        for error in errors:
            result = await self.fix_error(
                error["report"],
                error["code"]
            )
            results.append(result)

        return results

错误处理和日志记录

健壮的错误处理

import logging
from enum import Enum

class AgentStatus(Enum):
    """Agent状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    CANCELLED = "cancelled"

class RobustAgent:
    """健壮的Agent包装器"""

    def __init__(self):
        self.client = Anthropic()
        self.logger = logging.getLogger(__name__)

    async def execute_with_retry(self, agent_id, message, max_retries=3):
        """带重试的执行"""

        for attempt in range(max_retries):
            try:
                return await self._execute_once(agent_id, message)

            except Exception as e:
                self.logger.warning(f"尝试 {attempt + 1} 失败:{e}")

                if attempt < max_retries - 1:
                    # 指数退避
                    wait_time = 2 ** attempt
                    await asyncio.sleep(wait_time)
                else:
                    self.logger.error(f"所有 {max_retries} 次尝试都失败")
                    raise

    async def _execute_once(self, agent_id, message):
        """执行一次"""

        try:
            thread = self.client.agents.threads.create()
            self.logger.debug(f"创建线程:{thread.id}")

            self.client.agents.threads.messages.create(
                thread_id=thread.id,
                role="user",
                content=message
            )
            self.logger.debug(f"发送消息到线程 {thread.id}")

            run = self.client.agents.threads.runs.create(
                thread_id=thread.id,
                agent_id=agent_id
            )
            self.logger.debug(f"启动运行:{run.id}")

            while run.status in ["in_progress", "requires_action"]:
                await asyncio.sleep(0.5)
                run = self.client.agents.threads.runs.retrieve(
                    thread_id=thread.id,
                    run_id=run.id
                )

            if run.status == "failed":
                raise Exception(f"Agent失败:{run.last_error}")

            self.logger.debug(f"运行完成:{run.id}")

            messages = self.client.agents.threads.messages.list(
                thread_id=thread.id
            )

            return messages.data[-1].content[0].text

        except Exception as e:
            self.logger.error(f"执行失败:{e}", exc_info=True)
            raise

性能优化和成本管理

成本监控

class CostMonitor:
    """成本监控"""

    def __init__(self, daily_budget=100):
        self.daily_budget = daily_budget
        self.spent_today = 0
        self.logger = logging.getLogger(__name__)

    def estimate_cost(self, model, input_tokens, output_tokens):
        """估计成本"""

        # Claude 3 Opus价格
        prices = {
            "claude-opus-4-5-20251101": {
                "input": 0.015 / 1000,
                "output": 0.045 / 1000
            }
        }

        if model not in prices:
            return None

        cost = (
            input_tokens * prices[model]["input"] +
            output_tokens * prices[model]["output"]
        )

        return cost

    def can_afford(self, estimated_cost):
        """检查是否负担得起"""

        if self.spent_today + estimated_cost > self.daily_budget:
            self.logger.warning(
                f"成本超过预算。已消耗:${self.spent_today},"
                f"预算:${self.daily_budget}"
            )
            return False

        return True

    def charge(self, cost):
        """计费"""

        self.spent_today += cost
        self.logger.info(f"已消耗:${self.spent_today:.2f}(预算:${self.daily_budget})")

批量处理优化

class BatchProcessor:
    """批量处理优化"""

    def __init__(self, client):
        self.client = client

    async def batch_execute(self, agent_id, messages, batch_size=10):
        """批量执行消息"""

        results = []

        for i in range(0, len(messages), batch_size):
            batch = messages[i:i + batch_size]

            # 并行处理批次
            tasks = [
                self._execute_single(agent_id, msg)
                for msg in batch
            ]

            batch_results = await asyncio.gather(*tasks)
            results.extend(batch_results)

            # 每个批次之间等待,避免速率限制
            if i + batch_size < len(messages):
                await asyncio.sleep(1)

        return results

    async def _execute_single(self, agent_id, message):
        """执行单条消息"""
        # 实现类似前面的执行逻辑
        pass

最佳实践总结

Agent SDK使用的黄金法则:

1. 正确的工具设计
   - 工具应该原子化(单一职责)
   - 提供清晰的输入/输出schema
   - 实现健壮的错误处理

2. 权限管理
   - 最小权限原则
   - 工具访问控制
   - 审计日志

3. 会话管理
   - 持久化线程ID
   - 管理会话生命周期
   - 清理过期会话

4. 错误处理
   - 实现重试机制
   - 详细的日志记录
   - 优雅的降级

5. 成本控制
   - 监控token使用
   - 设置预算限制
   - 批量处理优化

6. 测试和监控
   - 单元测试工具
   - 集成测试工作流
   - 生产监控

总结

Agent SDK是Claude从交互式工具升级到系统集成的关键:

  • 从CLI到API:从手动执行到自动化系统
  • 从对话到工作流:定义清晰的任务和执行过程
  • 从通用到专业:创建专业化的Subagent
  • 从手工到自动:完全自动化的生产系统

通过Agent SDK,你可以构建真正的AI驱动的自动化系统,而不仅仅是AI辅助工具。


下一篇预告

下一篇文章将介绍 在CI/CD中集成Claude - 如何将Claude无缝集成到你的CI/CD流水线中,实现自动化代码审查、测试诊断和文档生成。

相关阅读

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐