DeepLearning.AI × Anthropic 官方课程实战篇 | 自定义 Skills 开发 | API/CLI/SDK 全平台应用 | 附完整代码实现

关键词:Claude Skills、自定义开发、Claude API、Claude Code、Agent SDK、实战案例、跨平台部署


前言

在前两篇学习笔记中,我们深入理解了 Skills 的核心概念、技术架构,以及它与 MCP、Subagents 等技术的协同关系。现在,是时候将理论付诸实践了。

本文基于 DeepLearning.AI 与 Anthropic 课程第 6-10 节内容,聚焦于 Skills 的实战应用:如何从零创建自定义 Skills,如何在 Claude API、Claude Code、Claude Agent SDK 等不同平台中使用 Skills,以及如何构建生产级的智能体应用。

通过本文,你将掌握:

  • 创建符合生产标准的自定义 Skills
  • 在 Claude API 中集成 Skills 的完整流程
  • 使用 Claude Code 构建命令行智能体应用
  • 利用 Agent SDK 开发复杂的多智能体系统
  • 跨平台部署和维护 Skills 的最佳实践

目录

  1. 自定义 Skills 开发完全指南
  2. 实战案例一:营销活动分析 Skill
  3. 实战案例二:时间序列分析 Skill
  4. 实战案例三:练习题生成 Skill
  5. Claude API 中的 Skills 集成
  6. Claude Code 实战应用
  7. Claude Agent SDK 高级开发
  8. 跨平台部署策略
  9. 生产环境最佳实践
  10. 总结与展望

1. 自定义 Skills 开发完全指南

1.1 开发前的准备工作

在开始创建自定义 Skills 之前,需要明确以下几个关键问题:

需求分析框架
class SkillRequirementAnalysis:
    """Skill 需求分析框架"""
    
    def __init__(self, task_description):
        self.task = task_description
        
    def analyze(self):
        return {
            # 1. 任务特征
            'frequency': self.is_repetitive(),      # 是否重复执行
            'complexity': self.count_steps(),       # 步骤复杂度
            'standardization': self.has_standard(), # 是否有标准流程
            
            # 2. 技术需求
            'external_data': self.needs_external(), # 是否需要外部数据
            'parallel': self.can_parallel(),        # 是否可并行
            'isolation': self.needs_isolation(),    # 是否需要隔离
            
            # 3. 资源需求
            'scripts': self.list_scripts(),         # 需要的脚本
            'references': self.list_references(),   # 参考文档
            'assets': self.list_assets(),           # 资源文件
        }
决策树
是否创建 Skill?
├─ 任务重复频率 > 3 次/月?
│  ├─ 是 → 继续评估
│  └─ 否 → 使用直接 Prompt
│
├─ 步骤数量 > 5 个?
│  ├─ 是 → 继续评估
│  └─ 否 → 考虑简化流程
│
├─ 需要标准化输出?
│  ├─ 是 → 创建 Skill
│  └─ 否 → 评估其他因素
│
└─ 涉及团队协作?
   ├─ 是 → 强烈推荐创建 Skill
   └─ 否 → 根据个人需求决定

1.2 Skill 目录结构设计

标准结构模板
skill-name/
├── SKILL.md                    # 核心文档(必需)
├── README.md                   # 用户指南(推荐)
├── scripts/                    # 可执行脚本(可选)
│   ├── __init__.py
│   ├── main.py                # 主脚本
│   ├── utils.py               # 工具函数
│   ├── validators.py          # 数据验证
│   └── tests/                 # 测试文件
│       ├── __init__.py
│       ├── test_main.py
│       └── test_utils.py
├── references/                 # 参考文档(可选)
│   ├── methodology.md         # 方法论
│   ├── examples.md            # 示例
│   ├── glossary.md            # 术语表
│   └── best_practices.md      # 最佳实践
├── assets/                     # 资源文件(可选)
│   ├── templates/             # 模板
│   │   ├── input_template.xlsx
│   │   └── output_template.xlsx
│   ├── configs/               # 配置
│   │   └── default_config.json
│   └── samples/               # 示例数据
│       └── sample_data.csv
└── .skillignore               # 忽略文件(可选)
文件命名规范
文件类型 命名规范 示例
Skill 名称 kebab-case analyzing-marketing-campaign
Python 脚本 snake_case data_processor.py
配置文件 snake_case default_config.json
文档文件 kebab-case best-practices.md
测试文件 test_ 前缀 test_main.py

1.3 SKILL.md 编写规范

核心原则

一个好的 SKILL.md 应该:

  1. 描述清晰:让 Claude 能准确理解何时使用这个 Skill
  2. 步骤具体:提供明确的执行流程和操作指令
  3. 格式规范:定义清晰的输入输出格式和数据结构
  4. 可操作性强:包含具体的计算公式、阈值、判断标准
  5. 引用外部资源:通过 references/ 目录提供详细的参考文档
完整模板(基于实际案例)
---
# ============ 元数据部分(YAML Front Matter)============
name: skill-name
description: >
  详细描述这个 Skill 的功能和使用场景。
  使用 "Use when..." 句式明确触发条件。
  例如:Use when analyzing multi-channel marketing data to calculate 
  funnel metrics, compare to benchmarks, or get budget recommendations.
---

# Skill 标题

简短的一句话说明 Skill 的核心功能。

## Input Requirements

明确定义输入数据的格式和要求:

**数据格式**:CSV / Excel / JSON / 其他

**必需字段**:
- **field_name**: 字段说明(数据类型、格式要求)
- **date**: 日期字段(YYYY-MM-DD 格式)
- **value**: 数值字段(非负数)

**可选字段**:
- **optional_field**: 可选字段说明(默认值)

**数据质量要求**:
1. 检查缺失值处理规则
2. 验证数据类型和范围
3. 标注异常情况(如:转化量大于点击量)

## Workflow / 执行流程

**Step 1: 数据验证与预处理**

```bash
python scripts/validate.py input.csv --output-dir results/

具体操作:

  • 读取输入文件
  • 验证必需列是否存在
  • 检查数据类型和格式
  • 处理缺失值(说明处理策略)
  • 标记异常值

Step 2: 核心计算

详细说明计算逻辑和公式:

指标 1:指标名称

计算公式

指标名称 = (分子 / 分母) × 100

示例

CTR = (1000 clicks / 50000 impressions) × 100 = 2.0%

基准值(如果适用):

类别 基准值 来源
类别A 2.5% 行业标准
类别B 3.0% 历史数据

指标 2:指标名称

计算公式

指标名称 = 总收入 / 总成本

判断标准

  • 优秀:> 4.0
  • 良好:3.0 - 4.0
  • 需改进:< 3.0

Step 3: 结果分析与输出

生成标准化的输出格式:

输出表格结构

列名 说明 格式 示例
channel 渠道名称 字符串 Google Ads
metric_1 指标1 百分比 2.5%
status 状态标识 符号 ✓ / ✗

状态标识规则

  • [OK]:满足标准
  • [X]:未达标准
  • [!]:需要关注

解读说明
在表格后提供逐项解读:

  • 突出关键发现
  • 说明异常情况
  • 提供改进建议

Advanced Features / 高级功能

功能 1:预算重新分配建议

触发条件:用户明确请求预算优化建议

执行步骤

  1. 读取 references/budget_reallocation_rules.md 获取决策框架
  2. 应用规则识别高/低表现渠道
  3. 计算建议的预算调整金额
  4. 预测调整后的预期收益

输出格式

### 预算优化建议

**增加预算**:
- 渠道A:当前 ROAS 4.5,建议增加 20-30%
- 渠道B:当前 ROAS 3.8,建议增加 10-20%

**减少预算**:
- 渠道C:当前 ROAS 1.2,建议减少 50% 或暂停

功能 2:可视化生成(可选)

python scripts/visualize.py results/data.csv --output-dir results/plots/

生成图表:

  • metric_comparison.png:指标对比柱状图
  • trend_analysis.png:趋势分析折线图
  • distribution.png:分布直方图

Script Options / 脚本参数

所有脚本支持的通用参数:

  • --input PATH:输入文件路径(必需)
  • --output-dir PATH:输出目录(默认:results/
  • --config PATH:配置文件路径(可选)
  • --date-col NAME:日期列名(自动检测或手动指定)
  • --value-col NAME:数值列名(自动检测或手动指定)
  • --verbose:详细输出模式

Output Files / 输出文件

results/
├── analysis_report.xlsx      # 主分析报告(Excel格式)
├── summary.md                # 摘要报告(Markdown格式)
├── metrics.json              # 所有指标数据(JSON格式)
└── plots/                    # 可视化图表目录
    ├── comparison.png
    ├── trend.png
    └── distribution.png

Default Values & Thresholds / 默认值与阈值

如果用户未提供,使用以下默认值:

成本参数

  • 运输成本:$8 / 订单
  • 产品成本:收入的 35%

性能阈值

  • 目标 ROAS:≥ 4.0x
  • 最大 CPA:≤ $50
  • 最小转化率:≥ 2.0%

基准值(按类别):

类别 CTR CVR ROAS
类别A 2.5% 3.8% 4.2
类别B 5.0% 4.5% 3.8

References / 参考文档

Skill 可以引用 references/ 目录中的详细文档:

  • references/methodology.md:详细的方法论说明
  • references/benchmark_data.md:行业基准数据
  • references/decision_rules.md:决策规则和判断标准
  • references/examples.md:完整的使用示例

引用方式

如需了解预算重新分配的完整决策框架,
请参阅 `references/budget_reallocation_rules.md`

Error Handling / 错误处理

常见错误及解决方案

错误 1:缺少必需列

Error: Missing required column 'impressions'

解决方案

  • 检查输入文件是否包含所有必需列
  • 使用 --help 查看完整的列名要求
  • 某些渠道可能不需要特定列(如 Email 不需要 impressions)

错误 2:数据类型不匹配

Error: Column 'spend' contains non-numeric values

解决方案

  • 检查数值列是否包含文本或特殊字符
  • 移除货币符号($, ¥)和千位分隔符(,)
  • 确保使用小数点(.)而非逗号(,)作为小数分隔符

错误 3:异常数据

Warning: Conversions (150) > Clicks (100) for campaign X

解决方案

  • 验证数据源的准确性
  • 检查是否存在数据录入错误
  • 考虑不同的归因模型可能导致的差异

Dependencies / 依赖项

Python 版本:>= 3.8

必需包

pandas>=1.5.0
numpy>=1.20.0
openpyxl>=3.0.0      # Excel 文件支持
matplotlib>=3.5.0     # 可视化

可选包

seaborn>=0.12.0      # 高级可视化
scipy>=1.9.0         # 统计分析

安装命令

pip install -r requirements.txt

Best Practices / 最佳实践

  1. 数据准备

    • 在分析前备份原始数据
    • 使用标准化的列名
    • 确保日期格式一致
  2. 参数配置

    • 根据业务实际调整阈值
    • 定期更新基准值
    • 记录配置变更历史
  3. 结果验证

    • 交叉验证关键指标
    • 对比历史数据趋势
    • 人工审核异常结果
  4. 性能优化

    • 对大数据集使用批处理
    • 启用缓存避免重复计算
    • 并行处理多个渠道
关键要点
  1. 描述要具体:不要只说"分析数据",要说"分析多渠道营销数据,计算 CTR、CVR、ROAS 等指标"

  2. 公式要明确:提供完整的计算公式和示例,不要让 Claude 猜测

  3. 阈值要清晰:明确什么是"好"、什么是"差",提供具体的数值标准

  4. 步骤要可执行:每个步骤都应该有明确的输入、处理、输出

  5. 引用要合理:将详细的规则和数据放在 references/ 目录,SKILL.md 中只提供概要

1.4 脚本开发规范

Python 脚本模板
#!/usr/bin/env python3
"""
Skill Name - Main Script
Description: 脚本功能说明
Author: Your Name
Version: 1.0.0
"""

import sys
import json
import logging
from typing import Dict, List, Any, Optional
from pathlib import Path

# ============ 日志配置 ============
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# ============ 常量定义 ============
DEFAULT_CONFIG = {
    'option1': 'value1',
    'option2': 'value2',
}

# ============ 异常类定义 ============
class SkillError(Exception):
    """Skill 基础异常类"""
    pass

class ValidationError(SkillError):
    """数据验证异常"""
    pass

class ProcessingError(SkillError):
    """数据处理异常"""
    pass

# ============ 核心类定义 ============
class SkillProcessor:
    """Skill 处理器主类"""
    
    def __init__(self, config: Optional[Dict] = None):
        """
        初始化处理器
        
        Args:
            config: 配置字典
        """
        self.config = {**DEFAULT_CONFIG, **(config or {})}
        self.logger = logging.getLogger(self.__class__.__name__)
        
    def validate_input(self, data: Any) -> Dict[str, Any]:
        """
        验证输入数据
        
        Args:
            data: 输入数据
            
        Returns:
            验证后的数据
            
        Raises:
            ValidationError: 数据验证失败
        """
        errors = []
        
        # 实现验证逻辑
        if not data:
            errors.append("输入数据不能为空")
        
        if errors:
            raise ValidationError(f"数据验证失败: {', '.join(errors)}")
        
        return data
    
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理数据
        
        Args:
            data: 输入数据
            
        Returns:
            处理结果
            
        Raises:
            ProcessingError: 处理失败
        """
        try:
            # 实现处理逻辑
            result = self._do_process(data)
            return result
        except Exception as e:
            raise ProcessingError(f"数据处理失败: {str(e)}")
    
    def _do_process(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """
        实际处理逻辑(私有方法)
        
        Args:
            data: 输入数据
            
        Returns:
            处理结果
        """
        # 实现具体处理逻辑
        return {'status': 'success', 'data': data}
    
    def generate_output(self, result: Dict[str, Any]) -> Dict[str, Any]:
        """
        生成输出
        
        Args:
            result: 处理结果
            
        Returns:
            格式化的输出
        """
        return {
            'status': 'success',
            'result': result,
            'metadata': {
                'version': '1.0.0',
                'timestamp': self._get_timestamp()
            }
        }
    
    def _get_timestamp(self) -> str:
        """获取当前时间戳"""
        from datetime import datetime
        return datetime.now().isoformat()
    
    def execute(self, input_data: Any) -> Dict[str, Any]:
        """
        执行完整流程
        
        Args:
            input_data: 输入数据
            
        Returns:
            执行结果
        """
        try:
            # 1. 验证输入
            validated_data = self.validate_input(input_data)
            self.logger.info("输入验证通过")
            
            # 2. 处理数据
            processed_data = self.process(validated_data)
            self.logger.info("数据处理完成")
            
            # 3. 生成输出
            output = self.generate_output(processed_data)
            self.logger.info("输出生成完成")
            
            return output
            
        except SkillError as e:
            self.logger.error(f"执行失败: {str(e)}")
            return {
                'status': 'error',
                'error': str(e),
                'error_type': e.__class__.__name__
            }
        except Exception as e:
            self.logger.error(f"未知错误: {str(e)}")
            return {
                'status': 'error',
                'error': str(e),
                'error_type': 'UnknownError'
            }

# ============ 命令行接口 ============
def main():
    """命令行入口函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description='Skill Name - Main Script')
    parser.add_argument('--input', required=True, help='输入文件路径')
    parser.add_argument('--output', required=True, help='输出文件路径')
    parser.add_argument('--config', help='配置文件路径')
    parser.add_argument('--verbose', action='store_true', help='详细输出')
    
    args = parser.parse_args()
    
    # 设置日志级别
    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)
    
    # 加载配置
    config = None
    if args.config:
        with open(args.config, 'r') as f:
            config = json.load(f)
    
    # 读取输入
    input_path = Path(args.input)
    if not input_path.exists():
        logger.error(f"输入文件不存在: {args.input}")
        sys.exit(1)
    
    with open(input_path, 'r') as f:
        input_data = json.load(f)
    
    # 执行处理
    processor = SkillProcessor(config)
    result = processor.execute(input_data)
    
    # 保存输出
    output_path = Path(args.output)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    with open(output_path, 'w') as f:
        json.dump(result, f, indent=2, ensure_ascii=False)
    
    # 输出结果
    if result['status'] == 'success':
        logger.info(f"执行成功,结果已保存到: {args.output}")
        sys.exit(0)
    else:
        logger.error(f"执行失败: {result.get('error', 'Unknown error')}")
        sys.exit(1)

if __name__ == '__main__':
    main()

1.5 测试策略

单元测试模板
#!/usr/bin/env python3
"""
Skill Name - Unit Tests
"""

import unittest
import json
from pathlib import Path
import sys

# 添加父目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))

from main import SkillProcessor, ValidationError, ProcessingError

class TestSkillProcessor(unittest.TestCase):
    """SkillProcessor 单元测试"""
    
    def setUp(self):
        """测试前准备"""
        self.processor = SkillProcessor()
        self.sample_data = {
            'field1': 'value1',
            'field2': 'value2'
        }
    
    def tearDown(self):
        """测试后清理"""
        pass
    
    # ============ 输入验证测试 ============
    def test_validate_input_success(self):
        """测试:输入验证成功"""
        result = self.processor.validate_input(self.sample_data)
        self.assertIsNotNone(result)
        self.assertEqual(result, self.sample_data)
    
    def test_validate_input_empty(self):
        """测试:空输入验证失败"""
        with self.assertRaises(ValidationError):
            self.processor.validate_input(None)
    
    def test_validate_input_invalid_type(self):
        """测试:无效类型验证失败"""
        with self.assertRaises(ValidationError):
            self.processor.validate_input("invalid")
    
    # ============ 数据处理测试 ============
    def test_process_success(self):
        """测试:数据处理成功"""
        result = self.processor.process(self.sample_data)
        self.assertIsNotNone(result)
        self.assertIn('status', result)
        self.assertEqual(result['status'], 'success')
    
    def test_process_with_config(self):
        """测试:使用配置处理数据"""
        config = {'option1': 'custom_value'}
        processor = SkillProcessor(config)
        result = processor.process(self.sample_data)
        self.assertIsNotNone(result)
    
    # ============ 输出生成测试 ============
    def test_generate_output(self):
        """测试:输出生成"""
        result = {'data': 'test'}
        output = self.processor.generate_output(result)
        self.assertIn('status', output)
        self.assertIn('result', output)
        self.assertIn('metadata', output)
    
    # ============ 完整流程测试 ============
    def test_execute_success(self):
        """测试:完整执行成功"""
        result = self.processor.execute(self.sample_data)
        self.assertEqual(result['status'], 'success')
    
    def test_execute_with_invalid_input(self):
        """测试:无效输入执行失败"""
        result = self.processor.execute(None)
        self.assertEqual(result['status'], 'error')
        self.assertIn('error', result)
    
    # ============ 边界情况测试 ============
    def test_empty_data(self):
        """测试:空数据处理"""
        result = self.processor.execute({})
        self.assertIsNotNone(result)
    
    def test_large_data(self):
        """测试:大数据处理"""
        large_data = {f'field{i}': f'value{i}' for i in range(1000)}
        result = self.processor.execute(large_data)
        self.assertEqual(result['status'], 'success')

# ============ 集成测试 ============
class TestIntegration(unittest.TestCase):
    """集成测试"""
    
    def test_end_to_end(self):
        """测试:端到端流程"""
        # 准备测试数据
        input_data = {
            'field1': 'value1',
            'field2': 'value2'
        }
        
        # 执行处理
        processor = SkillProcessor()
        result = processor.execute(input_data)
        
        # 验证结果
        self.assertEqual(result['status'], 'success')
        self.assertIn('result', result)
        self.assertIn('metadata', result)

if __name__ == '__main__':
    unittest.main()

2. 实战案例一:营销活动分析 Skill

2.1 业务需求分析

背景

某公司的营销团队每周需要分析多渠道的营销活动数据,包括:

  • Google Ads 广告投放数据
  • Facebook Ads 社交媒体数据
  • Email 营销数据

痛点

  • 手动计算指标耗时长(每次 2-3 小时)
  • 不同分析师的方法不一致
  • 容易出现计算错误
  • 难以追溯历史分析逻辑

目标

  • 自动化数据分析流程
  • 标准化指标计算方法
  • 生成统一格式的报告
  • 提供预算优化建议

2.2 Skill 设计

目录结构
analyzing-marketing-campaign/
├── SKILL.md
├── scripts/
│   ├── data_processor.py      # 数据处理
│   ├── metrics_calculator.py  # 指标计算
│   ├── report_generator.py    # 报告生成
│   └── tests/
│       ├── test_processor.py
│       └── test_calculator.py
├── references/
│   ├── budget_reallocation_rules.md
│   ├── channel_benchmarks.md
│   └── metric_definitions.md
└── assets/
    ├── templates/
    │   ├── input_template.xlsx
    │   └── output_template.xlsx
    └── samples/
        └── sample_campaign_data.csv
SKILL.md 核心内容
---
name: analyzing-marketing-campaign
description: 分析多渠道数字营销数据,计算转化漏斗、效率指标,并给出预算调整建议
version: 1.0.0
tags: [marketing, data-analysis, excel, automation]

inputs:
  - name: campaign_data
    type: file
    format: Excel/CSV
    required: true
    description: 包含营销活动数据的文件
    required_columns:
      - Date: 日期(YYYY-MM-DD)
      - Campaign_Name: 活动名称
      - Channel: 渠道(Google Ads/Facebook/Email)
      - Impressions: 曝光量
      - Clicks: 点击量
      - Conversions: 转化量
      - Spend: 花费(美元)
      - Revenue: 收入(美元)

outputs:
  - name: analysis_report
    type: file
    format: Excel
    description: 包含所有分析表格和图表
  - name: summary
    type: file
    format: Markdown
    description: 分析总结和建议

dependencies:
  python: ">=3.8"
  packages:
    - pandas>=1.5.0
    - openpyxl>=3.0.0
    - numpy>=1.20.0
---

## 执行流程

### 1. 数据读取与验证
- 读取 Excel/CSV 文件
- 验证必需列是否存在
- 检查数据类型和格式
- 处理缺失值和异常值

### 2. 计算漏斗指标(按渠道)

#### CTR(点击率)

CTR% = (Clicks / Impressions) × 100


#### CVR(转化率)

CVR% = (Conversions / Clicks) × 100


#### 整体转化率

Overall CVR% = (Conversions / Impressions) × 100


### 3. 计算效率指标(按渠道)

#### ROAS(广告回报率)

ROAS = Revenue / Spend


#### CPA(获客成本)

CPA = Spend / Conversions


#### 净利润

Net Profit = Revenue - Spend


#### ROI(投资回报率)

ROI% = ((Revenue - Spend) / Spend) × 100


### 4. 生成对比表格

创建包含以下内容的表格:
- 各渠道的所有指标
- 按 ROAS 降序排列
- 高亮表现最佳和最差的渠道
- 计算各指标的平均值

### 5. 预算重新分配建议

根据 `references/budget_reallocation_rules.md` 中的规则:
- 识别高 ROAS 渠道(建议增加预算)
- 识别低 ROAS 渠道(建议减少预算)
- 计算建议的预算调整金额
- 预测调整后的预期收益

2.3 核心脚本实现

metrics_calculator.py
#!/usr/bin/env python3
"""
营销指标计算器
"""

import pandas as pd
import numpy as np
from typing import Dict, List
import logging

logger = logging.getLogger(__name__)

class MetricsCalculator:
    """营销指标计算器"""
    
    def __init__(self, decimal_places: int = 2):
        """
        初始化计算器
        
        Args:
            decimal_places: 小数位数
        """
        self.decimal_places = decimal_places
    
    def calculate_funnel_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        计算漏斗指标
        
        Args:
            df: 包含营销数据的 DataFrame
            
        Returns:
            添加了漏斗指标的 DataFrame
        """
        # 按渠道分组
        grouped = df.groupby('Channel').agg({
            'Impressions': 'sum',
            'Clicks': 'sum',
            'Conversions': 'sum',
            'Spend': 'sum',
            'Revenue': 'sum'
        }).reset_index()
        
        # 计算 CTR(点击率)
        grouped['CTR%'] = (
            grouped['Clicks'] / grouped['Impressions'] * 100
        ).round(self.decimal_places)
        
        # 计算 CVR(转化率)
        grouped['CVR%'] = (
            grouped['Conversions'] / grouped['Clicks'] * 100
        ).round(self.decimal_places)
        
        # 计算整体转化率
        grouped['Overall_CVR%'] = (
            grouped['Conversions'] / grouped['Impressions'] * 100
        ).round(self.decimal_places)
        
        return grouped
    
    def calculate_efficiency_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        计算效率指标
        
        Args:
            df: 包含基础数据的 DataFrame
            
        Returns:
            添加了效率指标的 DataFrame
        """
        # 计算 ROAS(广告回报率)
        df['ROAS'] = (
            df['Revenue'] / df['Spend']
        ).round(self.decimal_places)
        
        # 计算 CPA(获客成本)
        df['CPA'] = (
            df['Spend'] / df['Conversions']
        ).round(self.decimal_places)
        
        # 计算净利润
        df['Net_Profit'] = (
            df['Revenue'] - df['Spend']
        ).round(self.decimal_places)
        
        # 计算 ROI(投资回报率)
        df['ROI%'] = (
            (df['Revenue'] - df['Spend']) / df['Spend'] * 100
        ).round(self.decimal_places)
        
        return df
    
    def calculate_all_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        计算所有指标
        
        Args:
            df: 原始数据 DataFrame
            
        Returns:
            包含所有指标的 DataFrame
        """
        # 计算漏斗指标
        result = self.calculate_funnel_metrics(df)
        
        # 计算效率指标
        result = self.calculate_efficiency_metrics(result)
        
        # 按 ROAS 降序排列
        result = result.sort_values('ROAS', ascending=False)
        
        return result
    
    def identify_top_performers(
        self, 
        df: pd.DataFrame, 
        metric: str = 'ROAS',
        top_n: int = 3
    ) -> List[str]:
        """
        识别表现最佳的渠道
        
        Args:
            df: 包含指标的 DataFrame
            metric: 评估指标
            top_n: 返回前 N 个
            
        Returns:
            表现最佳的渠道列表
        """
        return df.nlargest(top_n, metric)['Channel'].tolist()
    
    def identify_underperformers(
        self, 
        df: pd.DataFrame, 
        metric: str = 'ROAS',
        bottom_n: int = 3
    ) -> List[str]:
        """
        识别表现最差的渠道
        
        Args:
            df: 包含指标的 DataFrame
            metric: 评估指标
            bottom_n: 返回后 N 个
            
        Returns:
            表现最差的渠道列表
        """
        return df.nsmallest(bottom_n, metric)['Channel'].tolist()
    
    def calculate_benchmarks(self, df: pd.DataFrame) -> Dict[str, float]:
        """
        计算基准指标
        
        Args:
            df: 包含指标的 DataFrame
            
        Returns:
            基准指标字典
        """
        return {
            'avg_ctr': df['CTR%'].mean(),
            'avg_cvr': df['CVR%'].mean(),
            'avg_roas': df['ROAS'].mean(),
            'avg_cpa': df['CPA'].mean(),
            'avg_roi': df['ROI%'].mean(),
            'total_spend': df['Spend'].sum(),
            'total_revenue': df['Revenue'].sum(),
            'total_profit': df['Net_Profit'].sum()
        }
report_generator.py
#!/usr/bin/env python3
"""
报告生成器
"""

import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.chart import BarChart, Reference
from typing import Dict, List
import logging

logger = logging.getLogger(__name__)

class ReportGenerator:
    """报告生成器"""
    
    def __init__(self):
        """初始化生成器"""
        self.wb = Workbook()
        self.ws = self.wb.active
    
    def generate_excel_report(
        self, 
        df: pd.DataFrame, 
        output_path: str,
        include_charts: bool = True
    ):
        """
        生成 Excel 报告
        
        Args:
            df: 数据 DataFrame
            output_path: 输出路径
            include_charts: 是否包含图表
        """
        # 创建工作表
        ws = self.wb.active
        ws.title = "Channel Analysis"
        
        # 写入数据
        for r_idx, row in enumerate(df.itertuples(index=False), start=1):
            for c_idx, value in enumerate(row, start=1):
                cell = ws.cell(row=r_idx, column=c_idx, value=value)
                
                # 设置标题行样式
                if r_idx == 1:
                    cell.font = Font(bold=True, color="FFFFFF")
                    cell.fill = PatternFill(
                        start_color="4472C4",
                        end_color="4472C4",
                        fill_type="solid"
                    )
                    cell.alignment = Alignment(horizontal="center")
        
        # 高亮最佳和最差渠道
        self._highlight_performers(ws, df)
        
        # 添加图表
        if include_charts:
            self._add_charts(ws, df)
        
        # 保存文件
        self.wb.save(output_path)
        logger.info(f"Excel 报告已保存到: {output_path}")
    
    def _highlight_performers(self, ws, df: pd.DataFrame):
        """高亮表现最佳和最差的渠道"""
        # 找到 ROAS 列
        roas_col = df.columns.get_loc('ROAS') + 1
        
        # 高亮最高 ROAS(绿色)
        max_roas_row = df['ROAS'].idxmax() + 2  # +2 因为有标题行
        ws.cell(row=max_roas_row, column=roas_col).fill = PatternFill(
            start_color="C6EFCE",
            end_color="C6EFCE",
            fill_type="solid"
        )
        
        # 高亮最低 ROAS(红色)
        min_roas_row = df['ROAS'].idxmin() + 2
        ws.cell(row=min_roas_row, column=roas_col).fill = PatternFill(
            start_color="FFC7CE",
            end_color="FFC7CE",
            fill_type="solid"
        )
    
    def _add_charts(self, ws, df: pd.DataFrame):
        """添加图表"""
        # 创建柱状图
        chart = BarChart()
        chart.title = "ROAS by Channel"
        chart.x_axis.title = "Channel"
        chart.y_axis.title = "ROAS"
        
        # 设置数据范围
        data = Reference(
            ws,
            min_col=df.columns.get_loc('ROAS') + 1,
            min_row=1,
            max_row=len(df) + 1
        )
        cats = Reference(
            ws,
            min_col=1,
            min_row=2,
            max_row=len(df) + 1
        )
        
        chart.add_data(data, titles_from_data=True)
        chart.set_categories(cats)
        
        # 添加图表到工作表
        ws.add_chart(chart, "J2")
    
    def generate_markdown_summary(
        self, 
        df: pd.DataFrame,
        benchmarks: Dict[str, float],
        output_path: str
    ):
        """
        生成 Markdown 总结
        
        Args:
            df: 数据 DataFrame
            benchmarks: 基准指标
            output_path: 输出路径
        """
        summary = []
        
        # 标题
        summary.append("# 营销活动分析报告\n")
        
        # 概览
        summary.append("## 概览\n")
        summary.append(f"- 总花费: ${benchmarks['total_spend']:,.2f}")
        summary.append(f"- 总收入: ${benchmarks['total_revenue']:,.2f}")
        summary.append(f"- 总利润: ${benchmarks['total_profit']:,.2f}")
        summary.append(f"- 平均 ROAS: {benchmarks['avg_roas']:.2f}\n")
        
        # 表现最佳的渠道
        summary.append("## 表现最佳的渠道\n")
        top_channels = df.nlargest(3, 'ROAS')
        for _, row in top_channels.iterrows():
            summary.append(
                f"- **{row['Channel']}**: "
                f"ROAS {row['ROAS']:.2f}, "
                f"ROI {row['ROI%']:.1f}%"
            )
        summary.append("")
        
        # 需要优化的渠道
        summary.append("## 需要优化的渠道\n")
        bottom_channels = df.nsmallest(3, 'ROAS')
        for _, row in bottom_channels.iterrows():
            summary.append(
                f"- **{row['Channel']}**: "
                f"ROAS {row['ROAS']:.2f}, "
                f"CPA ${row['CPA']:.2f}"
            )
        summary.append("")
        
        # 建议
        summary.append("## 预算优化建议\n")
        summary.append(self._generate_recommendations(df))
        
        # 保存文件
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(summary))
        
        logger.info(f"Markdown 总结已保存到: {output_path}")
    
    def _generate_recommendations(self, df: pd.DataFrame) -> str:
        """生成预算优化建议"""
        recommendations = []
        
        # 识别高 ROAS 渠道
        high_roas = df[df['ROAS'] > df['ROAS'].mean()]
        if not high_roas.empty:
            recommendations.append("### 增加预算")
            for _, row in high_roas.iterrows():
                recommendations.append(
                    f"- **{row['Channel']}**: "
                    f"当前 ROAS {row['ROAS']:.2f},"
                    f"建议增加预算 20-30%"
                )
        
        # 识别低 ROAS 渠道
        low_roas = df[df['ROAS'] < df['ROAS'].mean() * 0.7]
        if not low_roas.empty:
            recommendations.append("\n### 减少预算")
            for _, row in low_roas.iterrows():
                recommendations.append(
                    f"- **{row['Channel']}**: "
                    f"当前 ROAS {row['ROAS']:.2f},"
                    f"建议减少预算 30-50% 或暂停"
                )
        
        return '\n'.join(recommendations)

2.4 使用示例

命令行使用
# 基础用法
python scripts/data_processor.py \
  --input data/campaign_data.xlsx \
  --output results/analysis_report.xlsx

# 带配置文件
python scripts/data_processor.py \
  --input data/campaign_data.xlsx \
  --output results/analysis_report.xlsx \
  --config config/custom_config.json \
  --verbose
Python 代码使用
from analyzing_marketing_campaign.scripts.data_processor import SkillProcessor
from analyzing_marketing_campaign.scripts.metrics_calculator import MetricsCalculator
from analyzing_marketing_campaign.scripts.report_generator import ReportGenerator
import pandas as pd

# 读取数据
df = pd.read_excel('data/campaign_data.xlsx')

# 计算指标
calculator = MetricsCalculator(decimal_places=2)
metrics_df = calculator.calculate_all_metrics(df)

# 生成报告
generator = ReportGenerator()
generator.generate_excel_report(
    metrics_df,
    'results/analysis_report.xlsx',
    include_charts=True
)

# 生成 Markdown 总结
benchmarks = calculator.calculate_benchmarks(metrics_df)
generator.generate_markdown_summary(
    metrics_df,
    benchmarks,
    'results/summary.md'
)

2.5 实际效果

效率提升
指标 手动分析 使用 Skill 提升
时间成本 2-3 小时 5-10 分钟 95% ↓
错误率 5-10% <1% 90% ↓
一致性 显著提升
可追溯性 困难 容易 显著提升
业务价值
  • 节省时间:每周节省 2-3 小时分析时间
  • 提高准确性:消除人为计算错误
  • 标准化流程:确保分析方法一致
  • 数据驱动决策:提供清晰的预算优化建议

3. 实战案例二:时间序列分析 Skill

3.1 业务需求

背景

数据科学团队需要定期分析时间序列数据,包括:

  • 销售数据趋势分析
  • 用户行为模式识别
  • 异常检测和预警
  • 季节性分析

目标

  • 自动化时间序列分析流程
  • 提供标准化的诊断报告
  • 生成可视化图表
  • 识别异常和趋势

3.2 Skill 设计

目录结构
analyzing-time-series/
├── SKILL.md
├── scripts/
│   ├── ts_utils.py           # 时间序列工具
│   ├── diagnose.py           # 诊断分析
│   ├── visualize.py          # 可视化
│   └── tests/
│       └── test_ts_utils.py
├── references/
│   └── interpretation.md     # 结果解读指南
└── assets/
    └── samples/
        └── sample_ts_data.csv
核心功能
---
name: analyzing-time-series
description: 分析时间序列数据,识别趋势、季节性和异常,生成诊断报告和可视化
version: 1.0.0
tags: [data-analysis, time-series, statistics, visualization]

inputs:
  - name: time_series_data
    type: file
    format: CSV
    required: true
    description: 时间序列数据文件
    required_columns:
      - date: 日期列
      - value: 数值列

outputs:
  - name: diagnostic_report
    type: file
    format: Markdown
    description: 诊断报告
  - name: visualizations
    type: directory
    description: 可视化图表目录

dependencies:
  python: ">=3.8"
  packages:
    - pandas>=1.5.0
    - numpy>=1.20.0
    - matplotlib>=3.5.0
    - statsmodels>=0.13.0
---

## 分析流程

### 1. 数据预处理
- 读取时间序列数据
- 处理缺失值
- 检测异常值
- 数据标准化

### 2. 趋势分析
- 移动平均
- 趋势线拟合
- 趋势强度评估

### 3. 季节性分析
- 季节性分解
- 周期性检测
- 季节性强度评估

### 4. 异常检测
- 统计方法检测异常
- 识别突变点
- 异常严重程度评估

### 5. 预测分析
- 短期预测
- 置信区间计算
- 预测准确度评估

3.3 核心实现

ts_utils.py
#!/usr/bin/env python3
"""
时间序列分析工具
"""

import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
from typing import Dict, Tuple, List
import logging

logger = logging.getLogger(__name__)

class TimeSeriesAnalyzer:
    """时间序列分析器"""
    
    def __init__(self, data: pd.Series, freq: str = 'D'):
        """
        初始化分析器
        
        Args:
            data: 时间序列数据
            freq: 频率(D=日, W=周, M=月)
        """
        self.data = data
        self.freq = freq
        self.decomposition = None
    
    def detect_trend(self) -> Dict[str, any]:
        """
        检测趋势
        
        Returns:
            趋势分析结果
        """
        # 计算移动平均
        ma_7 = self.data.rolling(window=7).mean()
        ma_30 = self.data.rolling(window=30).mean()
        
        # 计算趋势强度
        trend_strength = self._calculate_trend_strength()
        
        # 判断趋势方向
        recent_trend = self._determine_trend_direction()
        
        return {
            'ma_7': ma_7,
            'ma_30': ma_30,
            'trend_strength': trend_strength,
            'trend_direction': recent_trend,
            'is_stationary': self._test_stationarity()
        }
    
    def detect_seasonality(self, period: int = None) -> Dict[str, any]:
        """
        检测季节性
        
        Args:
            period: 季节周期(自动检测如果为 None)
            
        Returns:
            季节性分析结果
        """
        if period is None:
            period = self._detect_period()
        
        # 季节性分解
        self.decomposition = seasonal_decompose(
            self.data,
            model='additive',
            period=period
        )
        
        # 计算季节性强度
        seasonal_strength = self._calculate_seasonal_strength()
        
        return {
            'period': period,
            'seasonal_strength': seasonal_strength,
            'has_seasonality': seasonal_strength > 0.3,
            'trend': self.decomposition.trend,
            'seasonal': self.decomposition.seasonal,
            'residual': self.decomposition.resid
        }
    
    def detect_anomalies(
        self, 
        method: str = 'zscore',
        threshold: float = 3.0
    ) -> Dict[str, any]:
        """
        检测异常值
        
        Args:
            method: 检测方法(zscore/iqr)
            threshold: 阈值
            
        Returns:
            异常检测结果
        """
        if method == 'zscore':
            anomalies = self._detect_anomalies_zscore(threshold)
        elif method == 'iqr':
            anomalies = self._detect_anomalies_iqr()
        else:
            raise ValueError(f"Unknown method: {method}")
        
        return {
            'anomaly_indices': anomalies,
            'anomaly_count': len(anomalies),
            'anomaly_percentage': len(anomalies) / len(self.data) * 100,
            'anomaly_values': self.data.iloc[anomalies].tolist()
        }
    
    def _calculate_trend_strength(self) -> float:
        """计算趋势强度"""
        if self.decomposition is None:
            self.detect_seasonality()
        
        var_residual = np.var(self.decomposition.resid.dropna())
        var_trend = np.var(self.decomposition.trend.dropna())
        
        if var_residual + var_trend == 0:
            return 0.0
        
        return max(0, 1 - var_residual / (var_residual + var_trend))
    
    def _calculate_seasonal_strength(self) -> float:
        """计算季节性强度"""
        if self.decomposition is None:
            return 0.0
        
        var_residual = np.var(self.decomposition.resid.dropna())
        var_seasonal = np.var(self.decomposition.seasonal.dropna())
        
        if var_residual + var_seasonal == 0:
            return 0.0
        
        return max(0, 1 - var_residual / (var_residual + var_seasonal))
    
    def _determine_trend_direction(self) -> str:
        """判断趋势方向"""
        recent_data = self.data.tail(30)
        slope = np.polyfit(range(len(recent_data)), recent_data, 1)[0]
        
        if slope > 0.01:
            return 'increasing'
        elif slope < -0.01:
            return 'decreasing'
        else:
            return 'stable'
    
    def _test_stationarity(self) -> bool:
        """测试平稳性"""
        result = adfuller(self.data.dropna())
        return result[1] < 0.05  # p-value < 0.05 表示平稳
    
    def _detect_period(self) -> int:
        """自动检测周期"""
        # 简化实现:根据数据频率推断
        if self.freq == 'D':
            return 7  # 周周期
        elif self.freq == 'W':
            return 4  # 月周期
        elif self.freq == 'M':
            return 12  # 年周期
        else:
            return 7
    
    def _detect_anomalies_zscore(self, threshold: float) -> List[int]:
        """使用 Z-score 方法检测异常"""
        mean = self.data.mean()
        std = self.data.std()
        z_scores = np.abs((self.data - mean) / std)
        return np.where(z_scores > threshold)[0].tolist()
    
    def _detect_anomalies_iqr(self) -> List[int]:
        """使用 IQR 方法检测异常"""
        Q1 = self.data.quantile(0.25)
        Q3 = self.data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        return np.where(
            (self.data < lower_bound) | (self.data > upper_bound)
        )[0].tolist()
diagnose.py
#!/usr/bin/env python3
"""
时间序列诊断
"""

import pandas as pd
from ts_utils import TimeSeriesAnalyzer
from typing import Dict
import logging

logger = logging.getLogger(__name__)

class TimeSeriesDiagnostic:
    """时间序列诊断器"""
    
    def __init__(self, data: pd.Series):
        """
        初始化诊断器
        
        Args:
            data: 时间序列数据
        """
        self.data = data
        self.analyzer = TimeSeriesAnalyzer(data)
        self.results = {}
    
    def run_full_diagnostic(self) -> Dict[str, any]:
        """
        运行完整诊断
        
        Returns:
            诊断结果字典
        """
        logger.info("开始时间序列诊断...")
        
        # 1. 基础统计
        self.results['basic_stats'] = self._calculate_basic_stats()
        
        # 2. 趋势分析
        self.results['trend'] = self.analyzer.detect_trend()
        
        # 3. 季节性分析
        self.results['seasonality'] = self.analyzer.detect_seasonality()
        
        # 4. 异常检测
        self.results['anomalies'] = self.analyzer.detect_anomalies()
        
        # 5. 生成诊断报告
        self.results['diagnosis'] = self._generate_diagnosis()
        
        logger.info("诊断完成")
        return self.results
    
    def _calculate_basic_stats(self) -> Dict[str, float]:
        """计算基础统计量"""
        return {
            'count': len(self.data),
            'mean': self.data.mean(),
            'std': self.data.std(),
            'min': self.data.min(),
            'max': self.data.max(),
            'median': self.data.median(),
            'q25': self.data.quantile(0.25),
            'q75': self.data.quantile(0.75)
        }
    
    def _generate_diagnosis(self) -> Dict[str, str]:
        """生成诊断结论"""
        diagnosis = {
            'overall': '',
            'trend': '',
            'seasonality': '',
            'anomalies': '',
            'recommendations': []
        }
        
        # 趋势诊断
        trend_info = self.results['trend']
        if trend_info['trend_direction'] == 'increasing':
            diagnosis['trend'] = "数据呈上升趋势"
            diagnosis['recommendations'].append("关注增长的可持续性")
        elif trend_info['trend_direction'] == 'decreasing':
            diagnosis['trend'] = "数据呈下降趋势"
            diagnosis['recommendations'].append("分析下降原因并采取措施")
        else:
            diagnosis['trend'] = "数据相对稳定"
        
        # 季节性诊断
        seasonal_info = self.results['seasonality']
        if seasonal_info['has_seasonality']:
            diagnosis['seasonality'] = (
                f"检测到明显的季节性模式,"
                f"周期为 {seasonal_info['period']} 个时间单位"
            )
            diagnosis['recommendations'].append("考虑季节性因素进行预测")
        else:
            diagnosis['seasonality'] = "未检测到明显的季节性模式"
        
        # 异常诊断
        anomaly_info = self.results['anomalies']
        if anomaly_info['anomaly_percentage'] > 5:
            diagnosis['anomalies'] = (
                f"检测到 {anomaly_info['anomaly_count']} 个异常值 "
                f"({anomaly_info['anomaly_percentage']:.1f}%)"
            )
            diagnosis['recommendations'].append("调查异常值的原因")
        else:
            diagnosis['anomalies'] = "异常值数量在正常范围内"
        
        # 总体诊断
        diagnosis['overall'] = self._generate_overall_diagnosis()
        
        return diagnosis
    
    def _generate_overall_diagnosis(self) -> str:
        """生成总体诊断"""
        trend = self.results['trend']['trend_direction']
        has_seasonality = self.results['seasonality']['has_seasonality']
        anomaly_pct = self.results['anomalies']['anomaly_percentage']
        
        if trend == 'increasing' and not has_seasonality and anomaly_pct < 5:
            return "数据质量良好,呈稳定增长趋势"
        elif trend == 'decreasing' and anomaly_pct > 5:
            return "数据呈下降趋势且存在较多异常,需要重点关注"
        elif has_seasonality:
            return "数据具有明显的季节性特征,建议使用季节性模型"
        else:
            return "数据特征复杂,建议进行深入分析"
    
    def generate_markdown_report(self, output_path: str):
        """
        生成 Markdown 诊断报告
        
        Args:
            output_path: 输出路径
        """
        report = []
        
        # 标题
        report.append("# 时间序列诊断报告\n")
        
        # 基础统计
        report.append("## 基础统计\n")
        stats = self.results['basic_stats']
        report.append(f"- 数据点数: {stats['count']}")
        report.append(f"- 平均值: {stats['mean']:.2f}")
        report.append(f"- 标准差: {stats['std']:.2f}")
        report.append(f"- 最小值: {stats['min']:.2f}")
        report.append(f"- 最大值: {stats['max']:.2f}")
        report.append(f"- 中位数: {stats['median']:.2f}\n")
        
        # 趋势分析
        report.append("## 趋势分析\n")
        trend = self.results['trend']
        report.append(f"- 趋势方向: {trend['trend_direction']}")
        report.append(f"- 趋势强度: {trend['trend_strength']:.2f}")
        report.append(f"- 平稳性: {'是' if trend['is_stationary'] else '否'}\n")
        
        # 季节性分析
        report.append("## 季节性分析\n")
        seasonal = self.results['seasonality']
        report.append(f"- 季节周期: {seasonal['period']}")
        report.append(f"- 季节性强度: {seasonal['seasonal_strength']:.2f}")
        report.append(
            f"- 是否有季节性: {'是' if seasonal['has_seasonality'] else '否'}\n"
        )
        
        # 异常检测
        report.append("## 异常检测\n")
        anomalies = self.results['anomalies']
        report.append(f"- 异常值数量: {anomalies['anomaly_count']}")
        report.append(f"- 异常值比例: {anomalies['anomaly_percentage']:.2f}%\n")
        
        # 诊断结论
        report.append("## 诊断结论\n")
        diagnosis = self.results['diagnosis']
        report.append(f"**总体评估**: {diagnosis['overall']}\n")
        report.append(f"**趋势**: {diagnosis['trend']}\n")
        report.append(f"**季节性**: {diagnosis['seasonality']}\n")
        report.append(f"**异常**: {diagnosis['anomalies']}\n")
        
        # 建议
        if diagnosis['recommendations']:
            report.append("## 建议\n")
            for i, rec in enumerate(diagnosis['recommendations'], 1):
                report.append(f"{i}. {rec}")
        
        # 保存报告
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(report))
        
        logger.info(f"诊断报告已保存到: {output_path}")

3.4 使用示例

import pandas as pd
from analyzing_time_series.scripts.diagnose import TimeSeriesDiagnostic
from analyzing_time_series.scripts.visualize import TimeSeriesVisualizer

# 读取数据
df = pd.read_csv('data/sales_data.csv', parse_dates=['date'])
ts_data = df.set_index('date')['value']

# 运行诊断
diagnostic = TimeSeriesDiagnostic(ts_data)
results = diagnostic.run_full_diagnostic()

# 生成报告
diagnostic.generate_markdown_report('results/diagnostic_report.md')

# 生成可视化
visualizer = TimeSeriesVisualizer(ts_data, results)
visualizer.plot_all('results/visualizations/')

4. 实战案例三:练习题生成 Skill

4.1 业务需求

背景

教育机构需要根据课程笔记自动生成练习题,包括:

  • 多种题型(选择题、填空题、简答题)
  • 不同难度级别
  • LaTeX 格式输出
  • 答案和解析

4.2 Skill 设计

generating-practice-questions/
├── SKILL.md
├── scripts/
│   ├── question_generator.py
│   ├── latex_formatter.py
│   └── tests/
├── references/
│   └── examples_by_topic.md
└── assets/
    ├── templates/
    │   ├── markdown_template.md
    │   └── questions_template.tex
    └── samples/

4.3 核心实现

#!/usr/bin/env python3
"""
练习题生成器
"""

from typing import List, Dict
import random

class QuestionGenerator:
    """练习题生成器"""
    
    def __init__(self, notes_content: str):
        """
        初始化生成器
        
        Args:
            notes_content: 课程笔记内容
        """
        self.notes = notes_content
        self.questions = []
    
    def generate_multiple_choice(
        self, 
        count: int = 5,
        difficulty: str = 'medium'
    ) -> List[Dict]:
        """
        生成选择题
        
        Args:
            count: 题目数量
            difficulty: 难度级别
            
        Returns:
            选择题列表
        """
        questions = []
        
        # 从笔记中提取关键概念
        concepts = self._extract_concepts()
        
        for i in range(count):
            concept = random.choice(concepts)
            question = {
                'type': 'multiple_choice',
                'difficulty': difficulty,
                'question': f"关于 {concept['name']} 的描述,以下哪项是正确的?",
                'options': self._generate_options(concept),
                'correct_answer': 'A',
                'explanation': concept['explanation']
            }
            questions.append(question)
        
        return questions
    
    def generate_fill_in_blank(
        self, 
        count: int = 5
    ) -> List[Dict]:
        """生成填空题"""
        # 实现填空题生成逻辑
        pass
    
    def generate_short_answer(
        self, 
        count: int = 3
    ) -> List[Dict]:
        """生成简答题"""
        # 实现简答题生成逻辑
        pass
    
    def _extract_concepts(self) -> List[Dict]:
        """从笔记中提取关键概念"""
        # 实现概念提取逻辑
        pass
    
    def _generate_options(self, concept: Dict) -> List[str]:
        """生成选项"""
        # 实现选项生成逻辑
        pass

5. Claude API 中的 Skills 集成

5.1 核心概念

在 Claude API 中使用 Skills 需要理解三个关键组件:

代码执行工具(Code Execution Tool)
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# 启用代码执行工具
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=4096,
    tools=[{
        "type": "code_execution",
        "name": "bash",
        "description": "Execute bash commands"
    }],
    messages=[{
        "role": "user",
        "content": "Run a Python script to analyze data"
    }]
)

特点

  • 在沙箱环境中执行代码
  • 无互联网连接(安全限制)
  • 预装常用 Python 库
  • 资源限制(内存、CPU、磁盘)
Files API
# 上传文件到容器
with open("skill_file.py", "rb") as f:
    file_response = client.files.create(
        file=f,
        purpose="code_execution"
    )

file_id = file_response.id

# 在消息中引用文件
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=4096,
    tools=[{"type": "code_execution"}],
    messages=[{
        "role": "user",
        "content": f"Execute the script in file {file_id}"
    }]
)

# 下载生成的文件
output_file = client.files.content(file_id="output_file_id")
Skills 目录结构
容器内的 Skills 目录:
/tmp/skills/
├── analyzing-marketing-campaign/
│   ├── SKILL.md
│   ├── scripts/
│   └── references/
└── analyzing-time-series/
    ├── SKILL.md
    └── scripts/

5.2 完整集成示例

项目结构
api-skills-project/
├── .env                      # API 密钥
├── main.py                   # 主程序
├── skills/                   # Skills 目录
│   ├── analyzing-marketing-campaign/
│   └── analyzing-time-series/
└── data/                     # 数据文件
    └── campaign_data.xlsx
main.py 实现
#!/usr/bin/env python3
"""
Claude API Skills 集成示例
"""

import os
import json
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
import logging

# 加载环境变量
load_dotenv()

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SkillsAPIClient:
    """Skills API 客户端"""
    
    def __init__(self, api_key: str = None):
        """
        初始化客户端
        
        Args:
            api_key: Anthropic API 密钥
        """
        self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
        self.client = Anthropic(api_key=self.api_key)
        self.skills_dir = Path("skills")
        self.uploaded_files = {}
    
    def upload_skills(self):
        """上传所有 Skills 到容器"""
        logger.info("开始上传 Skills...")
        
        for skill_dir in self.skills_dir.iterdir():
            if skill_dir.is_dir():
                self._upload_skill_directory(skill_dir)
        
        logger.info(f"已上传 {len(self.uploaded_files)} 个文件")
    
    def _upload_skill_directory(self, skill_dir: Path):
        """
        上传单个 Skill 目录
        
        Args:
            skill_dir: Skill 目录路径
        """
        skill_name = skill_dir.name
        logger.info(f"上传 Skill: {skill_name}")
        
        # 遍历目录中的所有文件
        for file_path in skill_dir.rglob("*"):
            if file_path.is_file():
                relative_path = file_path.relative_to(self.skills_dir)
                self._upload_file(file_path, str(relative_path))
    
    def _upload_file(self, file_path: Path, remote_path: str):
        """
        上传单个文件
        
        Args:
            file_path: 本地文件路径
            remote_path: 远程路径
        """
        try:
            with open(file_path, "rb") as f:
                response = self.client.files.create(
                    file=f,
                    purpose="code_execution"
                )
            
            self.uploaded_files[remote_path] = response.id
            logger.debug(f"已上传: {remote_path} -> {response.id}")
            
        except Exception as e:
            logger.error(f"上传失败 {file_path}: {str(e)}")
    
    def execute_skill(
        self, 
        skill_name: str,
        input_data: dict,
        max_tokens: int = 4096
    ) -> dict:
        """
        执行 Skill
        
        Args:
            skill_name: Skill 名称
            input_data: 输入数据
            max_tokens: 最大 token 数
            
        Returns:
            执行结果
        """
        logger.info(f"执行 Skill: {skill_name}")
        
        # 构建提示
        prompt = self._build_prompt(skill_name, input_data)
        
        # 调用 API
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=max_tokens,
            tools=[{
                "type": "code_execution",
                "name": "bash"
            }],
            messages=[{
                "role": "user",
                "content": prompt
            }]
        )
        
        # 处理响应
        return self._process_response(response)
    
    def _build_prompt(self, skill_name: str, input_data: dict) -> str:
        """
        构建提示
        
        Args:
            skill_name: Skill 名称
            input_data: 输入数据
            
        Returns:
            提示文本
        """
        prompt = f"""
请使用 {skill_name} Skill 处理以下数据:

输入数据:
{json.dumps(input_data, indent=2, ensure_ascii=False)}

执行步骤:
1. 读取 /tmp/skills/{skill_name}/SKILL.md 了解 Skill 的使用方法
2. 根据 SKILL.md 中的指令执行相应的脚本
3. 处理输入数据并生成结果
4. 返回处理结果

请开始执行。
"""
        return prompt
    
    def _process_response(self, response) -> dict:
        """
        处理 API 响应
        
        Args:
            response: API 响应
            
        Returns:
            处理后的结果
        """
        result = {
            'status': 'success',
            'content': [],
            'tool_uses': []
        }
        
        for block in response.content:
            if block.type == 'text':
                result['content'].append(block.text)
            elif block.type == 'tool_use':
                result['tool_uses'].append({
                    'name': block.name,
                    'input': block.input
                })
        
        return result
    
    def download_results(self, file_ids: list, output_dir: Path):
        """
        下载结果文件
        
        Args:
            file_ids: 文件 ID 列表
            output_dir: 输出目录
        """
        output_dir.mkdir(parents=True, exist_ok=True)
        
        for file_id in file_ids:
            try:
                content = self.client.files.content(file_id)
                output_path = output_dir / f"{file_id}.dat"
                
                with open(output_path, 'wb') as f:
                    f.write(content)
                
                logger.info(f"已下载: {output_path}")
                
            except Exception as e:
                logger.error(f"下载失败 {file_id}: {str(e)}")

def main():
    """主函数"""
    # 初始化客户端
    client = SkillsAPIClient()
    
    # 上传 Skills
    client.upload_skills()
    
    # 准备输入数据
    input_data = {
        'data_file': 'data/campaign_data.xlsx',
        'output_file': 'results/analysis_report.xlsx'
    }
    
    # 执行 Skill
    result = client.execute_skill(
        'analyzing-marketing-campaign',
        input_data
    )
    
    # 输出结果
    print("执行结果:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    
    # 下载结果文件(如果有)
    if result.get('file_ids'):
        client.download_results(
            result['file_ids'],
            Path('results')
        )

if __name__ == '__main__':
    main()

5.3 最佳实践

错误处理
class SkillsAPIClient:
    def execute_skill_with_retry(
        self,
        skill_name: str,
        input_data: dict,
        max_retries: int = 3
    ) -> dict:
        """
        带重试的 Skill 执行
        
        Args:
            skill_name: Skill 名称
            input_data: 输入数据
            max_retries: 最大重试次数
            
        Returns:
            执行结果
        """
        for attempt in range(max_retries):
            try:
                return self.execute_skill(skill_name, input_data)
            except Exception as e:
                logger.warning(
                    f"执行失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}"
                )
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数退避
批量处理
def batch_execute_skills(
    self,
    tasks: List[Dict],
    batch_size: int = 5
) -> List[Dict]:
    """
    批量执行 Skills
    
    Args:
        tasks: 任务列表
        batch_size: 批次大小
        
    Returns:
        结果列表
    """
    results = []
    
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i + batch_size]
        batch_results = []
        
        for task in batch:
            result = self.execute_skill(
                task['skill_name'],
                task['input_data']
            )
            batch_results.append(result)
        
        results.extend(batch_results)
        
        # 批次间延迟,避免速率限制
        if i + batch_size < len(tasks):
            time.sleep(1)
    
    return results

5.4 性能优化

缓存机制
import hashlib
import pickle
from functools import lru_cache

class CachedSkillsClient(SkillsAPIClient):
    """带缓存的 Skills 客户端"""
    
    def __init__(self, *args, cache_dir: Path = None, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache_dir = cache_dir or Path('.cache')
        self.cache_dir.mkdir(exist_ok=True)
    
    def execute_skill(
        self,
        skill_name: str,
        input_data: dict,
        use_cache: bool = True
    ) -> dict:
        """
        执行 Skill(带缓存)
        
        Args:
            skill_name: Skill 名称
            input_data: 输入数据
            use_cache: 是否使用缓存
            
        Returns:
            执行结果
        """
        if use_cache:
            # 计算缓存键
            cache_key = self._compute_cache_key(skill_name, input_data)
            cache_file = self.cache_dir / f"{cache_key}.pkl"
            
            # 检查缓存
            if cache_file.exists():
                logger.info(f"使用缓存结果: {cache_key}")
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
        
        # 执行 Skill
        result = super().execute_skill(skill_name, input_data)
        
        # 保存到缓存
        if use_cache:
            with open(cache_file, 'wb') as f:
                pickle.dump(result, f)
        
        return result
    
    def _compute_cache_key(
        self,
        skill_name: str,
        input_data: dict
    ) -> str:
        """计算缓存键"""
        data_str = json.dumps(
            {'skill': skill_name, 'input': input_data},
            sort_keys=True
        )
        return hashlib.md5(data_str.encode()).hexdigest()

6. Claude Code 实战应用

6.1 Claude Code 工作原理

Claude Code 是一个强大的命令行智能体,它通过三个阶段完成任务:

智能体循环:
┌─────────────────────────────────────┐
│ 1. 收集上下文 (Gather Context)      │
│    - 搜索相关文件                    │
│    - 读取代码和文档                  │
│    - 理解项目结构                    │
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│ 2. 采取行动 (Take Action)           │
│    - 编辑文件                        │
│    - 执行命令                        │
│    - 运行测试                        │
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│ 3. 验证结果 (Verify Results)        │
│    - 检查输出                        │
│    - 运行测试                        │
│    - 确认更改                        │
└─────────────────────────────────────┘

6.2 Skills 在 Claude Code 中的使用

目录结构
project/
├── .claude/
│   └── skills/                # Skills 目录
│       ├── analyzing-marketing-campaign/
│       └── code-review/
├── CLAUDE.md                  # 项目级配置
├── src/
└── tests/
CLAUDE.md 配置
# 项目配置

## 开发规范

- 使用 Python 3.8+
- 遵循 PEP 8 代码规范
- 所有函数必须有类型注解
- 测试覆盖率 > 80%

## 工作流程

### 代码审查
使用 `/code-review` Skill 进行代码审查

### 数据分析
使用 `/analyzing-marketing-campaign` Skill 分析营销数据

## 命令

- `npm test`: 运行测试
- `npm run lint`: 代码检查
- `npm run build`: 构建项目

6.3 创建自定义 Skill

代码审查 Skill
---
name: code-review
description: 执行标准化的代码审查流程
context: fork  # 在隔离上下文中运行
---

## 审查清单

### 1. 代码质量
- [ ] 命名是否清晰且符合规范
- [ ] 是否有适当的注释
- [ ] 是否遵循 DRY 原则
- [ ] 是否有重复代码

### 2. 安全性
- [ ] 是否有 SQL 注入风险
- [ ] 是否有 XSS 漏洞
- [ ] 敏感信息是否加密
- [ ] 输入验证是否完善

### 3. 性能
- [ ] 是否有 N+1 查询问题
- [ ] 是否有不必要的循环
- [ ] 是否有内存泄漏风险
- [ ] 算法复杂度是否合理

### 4. 测试
- [ ] 是否有单元测试
- [ ] 测试覆盖率是否达标
- [ ] 边界情况是否测试
- [ ] 错误处理是否测试

## 执行流程

1. 读取要审查的文件
2. 逐项检查审查清单
3. 记录发现的问题
4. 按优先级排序
5. 生成审查报告

## 输出格式

生成包含以下内容的审查报告:
1. 总体评分(1-10)
2. 发现的问题列表(按优先级)
3. 改进建议
4. 代码示例(如果需要)

6.4 使用示例

命令行使用
# 启动 Claude Code
claude

# 使用 Skill
> 使用 /code-review Skill 审查 src/main.py

# 查看可用 Skills
> /skills

# 加载特定 Skill
> /load code-review

# 在隔离上下文中运行
> /fork analyzing-marketing-campaign
VS Code 集成
1. 安装 Claude Code 扩展
2. 打开项目
3. 点击 Spark 图标
4. 在提示框中输入:
   "使用 code-review Skill 审查当前文件"
5. Claude 会自动加载 Skill 并执行审查

6.5 高级功能

Subagents 集成
---
name: parallel-research
description: 并行研究多个主题
---

## 执行步骤

1. 将研究主题分解为子主题
2. 为每个子主题创建 Subagent
3. 并行执行所有 Subagents
4. 汇总结果

## Subagent 配置

```yaml
subagents:
  - name: docs-researcher
    tools: [web_search, web_fetch]
    skills: [research-methodology]
  
  - name: code-analyzer
    tools: [bash, read_file, grep]
    skills: [code-analysis]
  
  - name: web-researcher
    tools: [web_search, web_fetch]
    skills: [web-research]

使用方法

> 使用 /parallel-research Skill 研究 "MinerU PDF 提取工具"
MCP 集成
---
name: notion-sync
description: 将分析结果同步到 Notion
---

## 前置条件

需要配置 Notion MCP 服务器:

```json
{
  "mcpServers": {
    "notion": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-notion"],
      "env": {
        "NOTION_API_TOKEN": "${NOTION_API_TOKEN}"
      }
    }
  }
}

执行流程

  1. 读取本地分析结果
  2. 格式化为 Notion 块
  3. 调用 Notion MCP 工具
  4. 创建或更新 Notion 页面

使用方法

> 使用 /notion-sync Skill 将 results/report.md 同步到 Notion

6.6 最佳实践

Skill 命名规范
推荐:
- /code-review
- /analyze-data
- /generate-report

避免:
- /cr (太简短)
- /code_review_with_security_check (太长)
- /CodeReview (不使用驼峰命名)
上下文管理
---
name: large-file-processor
context: fork  # 在隔离上下文中运行,避免污染主上下文
---

## 说明

此 Skill 会处理大量文件,使用 fork 上下文可以:
- 避免主上下文被大量文件内容填满
- 提高性能
- 保持主对话清洁
权限控制
---
name: safe-analyzer
disable-model-invocation: true  # 禁止自动触发,必须手动调用
---

## 说明

此 Skill 包含敏感操作,设置为手动调用模式以确保安全。

description: 使用多个子智能体并行研究不同主题
context: fork

概述

当需要研究多个相关但独立的主题时,使用并行子智能体可以显著提高效率。

执行流程

1. 分解任务

将大任务分解为多个独立的子任务

2. 创建子智能体

为每个子任务创建专门的子智能体

3. 并行执行

同时启动所有子智能体

4. 汇总结果

收集并整合所有子智能体的结果

示例

用户:研究 React、Vue 和 Angular 三个框架

Claude:
1. 创建三个子智能体
2. 每个子智能体研究一个框架
3. 并行执行研究任务
4. 汇总生成对比报告

7. Claude Agent SDK 高级开发

7.1 核心架构:指挥官与专家团队

Claude Agent SDK 允许我们以编程方式构建复杂的智能体应用。其核心架构可以映射为企业组织结构:

架构层级
架构层级 概念隐喻 SDK 组件 具体实现
The Brain 指挥官/编排者 Main Agent + TaskTool 理解需求、制定计划、分派任务、合成报告
The Arms 专家/执行者 AgentDefinition (Sub-agents) Docs Researcher、Repo Analyzer、Web Researcher
Process 标准作业程序 SkillTool learning-a-tool: 定义学习新工具的标准流程
Connectivity 外部连接器 MCP Server Notion MCP: 将报告写入 Notion
智能体循环:
┌─────────────────────────────────────┐
│ 1. 收集上下文 (Gather Context)      │
│    - 搜索相关文件                    │
│    - 读取代码和文档                  │
│    - 理解项目结构                    │
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│ 2. 采取行动 (Take Action)           │
│    - 编辑文件                        │
│    - 执行命令                        │
│    - 运行测试                        │
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│ 3. 验证结果 (Verify Results)        │
│    - 检查输出                        │
│    - 运行测试                        │
│    - 确认更改                        │
└─────────────────────────────────────┘

关键区别

  • Skills 专门给主智能体使用,作为规划指导手册
  • Sub-agents 执行具体任务
  • 体现了最小权限原则

7.2 环境搭建与工程结构

项目目录结构
research-agent/
├── .env                  # 存放 API 密钥
├── agent.py              # 主程序入口
├── agents/               # 子智能体定义
│   ├── docs_researcher.md
│   ├── repo_analyzer.md
│   └── web_researcher.md
└── .claude/              # 技能库
    └── skills/           # 必须命名为 'skills'(复数)
        └── learning-a-tool/
            ├── SKILL.md
            └── progressive_learning.md
初始化与依赖安装
# 初始化 uv 项目
uv init

# 安装 Claude Agent SDK 及相关工具
uv add claude-agent-sdk python-dotenv asyncio

7.3 核心代码实现

第一步:导入与辅助函数
import asyncio
import os
from dotenv import load_dotenv
from claude_agent_sdk.utils import display_message
from claude_agent_sdk.tools import (
    Bash, Write, WebSearch, WebFetch,
    SkillTool, TaskTool
)
from claude_agent_sdk import Agent, AgentDefinition

# 加载环境变量
load_dotenv()
第二步:配置工具与权限

SDK 默认是只读且安全的。为了让智能体能写代码和上网,必须显式授权。

# 显式允许高权限工具
allowed_tools = [
    Write,      # 允许写文件
    Bash,       # 允许执行 Shell 命令
    WebSearch,  # 允许联网搜索
    WebFetch    # 允许抓取网页内容
]
第三步:集成 MCP
# 定义 MCP Server 配置
mcp_servers = {
    "notion": {
        "command": "uv",
        "args": ["run", "mcp-server-notion"],
        "env": {"NOTION_API_TOKEN": os.getenv("NOTION_API_TOKEN")}
    }
}

# 关键:必须将 MCP 工具加入允许列表
# 使用通配符允许所有 Notion 相关操作
allowed_tools.append("mcp_notion_*")
第四步:加载技能与任务调度
# 1. 配置 SkillTool
# setting_sources=["project"] 告诉 SDK 在当前项目的 .claude/skills 中查找
skill_tool = SkillTool(setting_sources=["project", "user"])

# 2. 配置 TaskTool(需要先加载子智能体定义)
task_tool = TaskTool(agents=sub_agent_definitions)

# 将它们加入工具集
allowed_tools.extend([skill_tool, task_tool])

7.4 角色定义与分工

角色 核心职责 关键工具配置
Main Agent 编排与合成。加载技能制定计划,分派任务,汇总结果并写入 Notion TaskTool, SkillTool, mcp_notion_*
Docs Researcher 文档研究。查找和阅读官方文档 WebSearch, WebFetch
Repo Analyzer 代码分析。克隆 GitHub 仓库,分析文件结构和代码逻辑 Bash, Read, Grep
Web Researcher 广泛搜索。查找教程、视频和社区讨论 WebSearch, WebFetch
子智能体定义示例
# docs_researcher.md
docs_researcher = AgentDefinition(
    name="docs_researcher",
    description="专门研究官方文档的智能体",
    instructions="""
你是一个文档研究专家。你的任务是:
1. 查找官方文档
2. 阅读并理解核心概念
3. 提取关键信息
4. 整理成结构化的笔记
    """,
    allowed_tools=[WebSearch, WebFetch]
)

# repo_analyzer.md
repo_analyzer = AgentDefinition(
    name="repo_analyzer",
    description="专门分析代码仓库的智能体",
    instructions="""
你是一个代码分析专家。你的任务是:
1. 克隆 GitHub 仓库
2. 分析项目结构
3. 阅读关键代码文件
4. 理解实现逻辑
    """,
    allowed_tools=[Bash, "read", "grep"]
)

# web_researcher.md
web_researcher = AgentDefinition(
    name="web_researcher",
    description="专门进行网络研究的智能体",
    instructions="""
你是一个网络研究专家。你的任务是:
1. 搜索教程和视频
2. 查找社区讨论
3. 收集最佳实践
4. 整理学习资源
    """,
    allowed_tools=[WebSearch, WebFetch]
)

7.5 工作流演示:研究 “MinerU”

课程演示了从零开始研究开源 PDF 提取工具 “MinerU” 的完整生命周期。

阶段 1:规划(Planning)& 渐进式披露
# 用户指令
user_input = "Create a learning guide for MinerU, show me the plan first."

# 主智能体加载 learning-a-tool 技能
# SDK 最初只加载技能的名称
# 当技能被触发时,加载 SKILL.md
# 需要时进一步加载引用的 progressive_learning.md
# 这保护了上下文窗口,避免一次性加载过多 Token

渐进式披露(Progressive Disclosure)

  • 第一层:技能名称和描述
  • 第二层:SKILL.md 核心内容
  • 第三层:引用的详细文档
  • 优势:节省 Token,提高效率
阶段 2:并行执行(Parallel Execution)
async def research_tool(tool_name: str):
    """研究工具的主函数"""
    
    # 主智能体根据计划,使用 TaskTool 同时调度三个子智能体
    tasks = [
        # Docs Researcher -> 阅读官方文档
        agent.delegate_task(
            "docs_researcher",
            f"Research official documentation for {tool_name}"
        ),
        
        # Repo Analyzer -> 执行 git clone 拉取代码并分析结构
        agent.delegate_task(
            "repo_analyzer",
            f"Clone and analyze the {tool_name} repository"
        ),
        
        # Web Researcher -> 在 YouTube 搜索视频教程
        agent.delegate_task(
            "web_researcher",
            f"Find tutorials and videos about {tool_name}"
        )
    ]
    
    # 并行执行所有任务
    results = await asyncio.gather(*tasks)
    
    return results

优势

  • 互不阻塞
  • 极大提高复杂任务的执行效率
  • 充分利用并发能力
阶段 3:合成(Synthesis)
def synthesize_results(results: list) -> dict:
    """合成子智能体的结果"""
    
    # 主智能体在本地文件系统生成标准化的目录结构
    output_structure = {
        'README.md': generate_readme(results),      # 学习路径和时间预估
        'resources.md': generate_resources(results), # 整理好的链接和参考文献
        'examples/': generate_examples(results)      # Hello World 代码示例
    }
    
    return output_structure
阶段 4:交付(Delivery via MCP)
async def deliver_to_notion(content: str, page_id: str):
    """将内容交付到 Notion"""
    
    # 用户指令:"Write the resources file to Notion."
    
    # 主智能体调用 Notion MCP 工具
    result = await agent.use_tool(
        "mcp_notion_append_block_children",
        {
            "page_id": page_id,
            "content": content
        }
    )
    
    # 结果:读取本地的 resources.md
    # 转换为 Notion 的 Rich Blocks(富文本块)
    # 自动写入到云端的 Notion 页面中
    
    return result

7.6 完整示例代码

#!/usr/bin/env python3
"""
研究智能体完整实现
"""

import asyncio
import os
from dotenv import load_dotenv
from claude_agent_sdk import Agent, AgentDefinition
from claude_agent_sdk.tools import (
    Bash, Write, WebSearch, WebFetch,
    SkillTool, TaskTool
)
from claude_agent_sdk.utils import display_message

# 加载环境变量
load_dotenv()

class ResearchAgent:
    """研究智能体"""
    
    def __init__(self):
        """初始化智能体"""
        self.api_key = os.getenv("ANTHROPIC_API_KEY")
        self.notion_token = os.getenv("NOTION_API_TOKEN")
        
        # 配置工具
        self.allowed_tools = [
            Write, Bash, WebSearch, WebFetch,
            "mcp_notion_*"
        ]
        
        # 配置 MCP
        self.mcp_servers = {
            "notion": {
                "command": "uv",
                "args": ["run", "mcp-server-notion"],
                "env": {"NOTION_API_TOKEN": self.notion_token}
            }
        }
        
        # 加载子智能体
        self.sub_agents = self._load_sub_agents()
        
        # 配置 Skills 和 Tasks
        self.skill_tool = SkillTool(setting_sources=["project", "user"])
        self.task_tool = TaskTool(agents=self.sub_agents)
        
        self.allowed_tools.extend([self.skill_tool, self.task_tool])
        
        # 创建主智能体
        self.agent = Agent(
            api_key=self.api_key,
            allowed_tools=self.allowed_tools,
            mcp_servers=self.mcp_servers
        )
    
    def _load_sub_agents(self) -> list:
        """加载子智能体定义"""
        return [
            AgentDefinition(
                name="docs_researcher",
                description="研究官方文档",
                instructions="查找并阅读官方文档,提取关键信息",
                allowed_tools=[WebSearch, WebFetch]
            ),
            AgentDefinition(
                name="repo_analyzer",
                description="分析代码仓库",
                instructions="克隆仓库,分析项目结构和代码",
                allowed_tools=[Bash, "read", "grep"]
            ),
            AgentDefinition(
                name="web_researcher",
                description="网络研究",
                instructions="搜索教程、视频和社区讨论",
                allowed_tools=[WebSearch, WebFetch]
            )
        ]
    
    async def research(self, topic: str) -> dict:
        """
        研究指定主题
        
        Args:
            topic: 研究主题
            
        Returns:
            研究结果
        """
        print(f"开始研究: {topic}")
        
        # 1. 制定计划
        plan = await self.agent.chat(
            f"Create a learning guide for {topic}, show me the plan first."
        )
        display_message(plan)
        
        # 2. 并行执行研究
        results = await self._parallel_research(topic)
        
        # 3. 合成结果
        synthesis = await self._synthesize_results(results)
        
        # 4. 交付到 Notion(可选)
        if self.notion_token:
            await self._deliver_to_notion(synthesis)
        
        return synthesis
    
    async def _parallel_research(self, topic: str) -> list:
        """并行研究"""
        tasks = [
            self.agent.delegate_task(
                "docs_researcher",
                f"Research official documentation for {topic}"
            ),
            self.agent.delegate_task(
                "repo_analyzer",
                f"Clone and analyze the {topic} repository"
            ),
            self.agent.delegate_task(
                "web_researcher",
                f"Find tutorials and videos about {topic}"
            )
        ]
        
        return await asyncio.gather(*tasks)
    
    async def _synthesize_results(self, results: list) -> dict:
        """合成结果"""
        synthesis_prompt = f"""
基于以下研究结果,生成完整的学习指南:

文档研究结果:
{results[0]}

代码分析结果:
{results[1]}

网络研究结果:
{results[2]}

请生成:
1. README.md - 学习路径和时间预估
2. resources.md - 整理好的链接和参考文献
3. examples/ - Hello World 代码示例
"""
        
        response = await self.agent.chat(synthesis_prompt)
        return response
    
    async def _deliver_to_notion(self, content: dict):
        """交付到 Notion"""
        delivery_prompt = f"""
将以下内容写入 Notion:

{content}

请使用 Notion MCP 工具创建新页面并写入内容。
"""
        
        response = await self.agent.chat(delivery_prompt)
        display_message(response)

async def main():
    """主函数"""
    # 创建研究智能体
    agent = ResearchAgent()
    
    # 研究 MinerU
    result = await agent.research("MinerU")
    
    print("\n研究完成!")
    print(result)

if __name__ == '__main__':
    asyncio.run(main())

7.7 关键启示与最佳实践

SDK vs Claude Code (CLI)
维度 Claude Code (CLI) Claude Agent SDK (Python)
定位 交互式工具,用于辅助编程和一次性任务 编程框架,用于构建可扩展的 AI 应用程序
控制力 由系统接管大部分决策 开发者可精细控制 System Prompt、工具集和错误处理逻辑
集成性 独立运行 可集成到现有的后端服务或产品中
适用场景 快速原型、个人开发 生产环境、企业应用
安全性警告

⚠️ Human-in-the-loop(人机回环)

在演示代码中,为了流程顺畅,直接允许了 BashWrite 的自动执行。

生产环境风险

  • Agent 可能会执行危险命令(如删除文件)
  • 可能覆盖重要数据
  • 可能产生意外的副作用

最佳实践

class SafeAgent(Agent):
    """安全的智能体实现"""
    
    async def use_tool(self, tool_name: str, params: dict):
        """使用工具(带安全检查)"""
        
        # 高风险工具列表
        high_risk_tools = ['bash', 'write', 'delete']
        
        if tool_name.lower() in high_risk_tools:
            # 暂停执行,请求用户确认
            print(f"\n⚠️ 智能体请求使用高风险工具: {tool_name}")
            print(f"参数: {params}")
            
            confirmation = input("是否允许执行?(y/n): ")
            
            if confirmation.lower() != 'y':
                return {"status": "denied", "message": "用户拒绝执行"}
        
        # 执行工具
        return await super().use_tool(tool_name, params)
技能的可移植性

.claude/skills 中编写的技能文件符合 Open Standard

  • 可以在 Python SDK 中使用
  • 可以直接拖入 Claude Desktop 使用
  • 可以被其他支持该标准的 Agent 框架复用
  • 无需修改代码

8. 跨平台部署策略

8.1 部署架构概览

Skills 可以在多个平台上部署和使用,每个平台都有其特定的优势和适用场景。

部署架构:
┌─────────────────────────────────────────────────────────┐
│                    Skills 核心层                         │
│              (.claude/skills/*.md)                      │
└─────────────────────────────────────────────────────────┘
                          ↓
        ┌─────────────────┼─────────────────┐
        ↓                 ↓                 ↓
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Claude       │  │ Claude       │  │ Claude       │
│ Desktop      │  │ Code (CLI)   │  │ Agent SDK    │
│              │  │              │  │              │
│ GUI 交互     │  │ 终端集成     │  │ 编程集成     │
└──────────────┘  └──────────────┘  └──────────────┘

8.2 平台特性对比

特性 Claude Desktop Claude Code Claude Agent SDK
使用方式 GUI 界面 命令行 Python 代码
技能加载 自动扫描 项目级配置 编程式加载
适用场景 个人使用、快速验证 开发辅助、项目协作 生产部署、系统集成
权限控制 用户确认 配置文件 代码控制
扩展性
学习曲线

8.3 Claude Desktop 部署

目录结构
~/.claude/
└── skills/
    ├── analyzing-marketing-campaign/
    │   ├── SKILL.md
    │   ├── scripts/
    │   └── references/
    └── code-review/
        └── SKILL.md
部署步骤
  1. 创建 Skills 目录
mkdir -p ~/.claude/skills
  1. 复制 Skill 文件
cp -r my-skill ~/.claude/skills/
  1. 重启 Claude Desktop
  • Skills 会自动被扫描和加载
  • 在对话中可以直接使用
使用示例
用户:使用 analyzing-marketing-campaign Skill 分析这个数据文件

Claude:好的,我将使用营销活动分析 Skill 来处理您的数据...
[自动加载 Skill 并执行]

8.4 Claude Code 部署

项目结构
project/
├── .claude/
│   ├── skills/              # 项目级 Skills
│   │   └── custom-skill/
│   └── settings.json        # 项目配置
├── CLAUDE.md                # 项目文档
└── src/
CLAUDE.md 配置
# 项目配置

## Skills

本项目使用以下 Skills:

### custom-skill
用于执行项目特定的任务

使用方法:
```bash
/custom-skill [参数]

开发规范

  • 遵循 PEP 8
  • 所有函数必须有类型注解
  • 测试覆盖率 > 80%

#### 部署步骤

1. **创建项目 Skills 目录**
```bash
mkdir -p .claude/skills
  1. 添加 Skill
cp -r ~/skills/my-skill .claude/skills/
  1. 配置 CLAUDE.md
  • 添加 Skill 说明
  • 定义使用规范
  1. 使用 Skill
claude
> /my-skill

8.5 Claude Agent SDK 部署

生产环境结构
production-app/
├── .env                     # 环境变量
├── requirements.txt         # Python 依赖
├── app/
│   ├── __init__.py
│   ├── agent.py            # 智能体实现
│   ├── skills/             # Skills 目录
│   │   └── custom-skill/
│   └── agents/             # 子智能体定义
│       └── researcher.md
├── tests/                  # 测试
│   └── test_agent.py
└── docker/                 # Docker 配置
    ├── Dockerfile
    └── docker-compose.yml
Dockerfile 示例
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app/ ./app/

# 设置环境变量
ENV PYTHONUNBUFFERED=1

# 运行应用
CMD ["python", "-m", "app.agent"]
docker-compose.yml
version: '3.8'

services:
  agent:
    build:
      context: .
      dockerfile: docker/Dockerfile
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - NOTION_API_TOKEN=${NOTION_API_TOKEN}
    volumes:
      - ./app/skills:/app/app/skills:ro
      - ./data:/app/data
    restart: unless-stopped
部署脚本
#!/bin/bash
# deploy.sh

set -e

echo "开始部署 Agent 应用..."

# 1. 构建 Docker 镜像
echo "构建 Docker 镜像..."
docker-compose build

# 2. 运行测试
echo "运行测试..."
docker-compose run --rm agent python -m pytest tests/

# 3. 启动服务
echo "启动服务..."
docker-compose up -d

# 4. 检查健康状态
echo "检查服务状态..."
docker-compose ps

echo "部署完成!"

8.6 CI/CD 集成

GitHub Actions 示例
name: Deploy Agent

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    
    - name: Run tests
      run: |
        pytest tests/
      env:
        ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
    
    - name: Lint Skills
      run: |
        python scripts/validate_skills.py app/skills/

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Build and push Docker image
      run: |
        docker build -t my-agent:latest -f docker/Dockerfile .
        docker push my-agent:latest
    
    - name: Deploy to production
      run: |
        ssh user@server 'cd /app && docker-compose pull && docker-compose up -d'

8.7 监控与日志

日志配置
import logging
from logging.handlers import RotatingFileHandler

def setup_logging():
    """配置日志系统"""
    
    # 创建日志器
    logger = logging.getLogger('agent')
    logger.setLevel(logging.INFO)
    
    # 文件处理器(带轮转)
    file_handler = RotatingFileHandler(
        'logs/agent.log',
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setLevel(logging.INFO)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger
性能监控
import time
from functools import wraps

def monitor_performance(func):
    """性能监控装饰器"""
    
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        
        try:
            result = await func(*args, **kwargs)
            
            # 记录成功执行
            duration = time.time() - start_time
            logger.info(
                f"{func.__name__} 执行成功,"
                f"耗时: {duration:.2f}秒"
            )
            
            return result
            
        except Exception as e:
            # 记录错误
            duration = time.time() - start_time
            logger.error(
                f"{func.__name__} 执行失败,"
                f"耗时: {duration:.2f}秒,"
                f"错误: {str(e)}"
            )
            raise
    
    return wrapper

# 使用示例
class MonitoredAgent(Agent):
    
    @monitor_performance
    async def research(self, topic: str):
        """研究主题(带监控)"""
        return await super().research(topic)

8.8 版本管理

Skills 版本控制
---
name: analyzing-marketing-campaign
version: 2.1.0
changelog:
  - version: 2.1.0
    date: 2024-02-13
    changes:
      - 添加了 ROI 计算功能
      - 优化了报告生成速度
      - 修复了数据验证 bug
  - version: 2.0.0
    date: 2024-02-01
    changes:
      - 重构了核心计算逻辑
      - 添加了批量处理功能
  - version: 1.0.0
    date: 2024-01-15
    changes:
      - 初始版本发布
---
兼容性检查
from packaging import version

def check_skill_compatibility(skill_version: str, min_version: str) -> bool:
    """
    检查 Skill 版本兼容性
    
    Args:
        skill_version: Skill 版本
        min_version: 最小要求版本
        
    Returns:
        是否兼容
    """
    return version.parse(skill_version) >= version.parse(min_version)

# 使用示例
if not check_skill_compatibility(skill.version, "2.0.0"):
    raise ValueError(
        f"Skill 版本 {skill.version} 不兼容,"
        f"需要 >= 2.0.0"
    )

9. 生产环境最佳实践

9.1 安全性

API 密钥管理
import os
from pathlib import Path
from cryptography.fernet import Fernet

class SecureConfig:
    """安全配置管理"""
    
    def __init__(self, config_dir: Path = None):
        """
        初始化安全配置
        
        Args:
            config_dir: 配置目录
        """
        self.config_dir = config_dir or Path.home() / '.agent_config'
        self.config_dir.mkdir(exist_ok=True)
        
        # 加载或生成加密密钥
        self.key = self._load_or_generate_key()
        self.cipher = Fernet(self.key)
    
    def _load_or_generate_key(self) -> bytes:
        """加载或生成加密密钥"""
        key_file = self.config_dir / 'secret.key'
        
        if key_file.exists():
            with open(key_file, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(key_file, 'wb') as f:
                f.write(key)
            # 设置文件权限为仅所有者可读写
            key_file.chmod(0o600)
            return key
    
    def save_api_key(self, service: str, api_key: str):
        """
        保存 API 密钥(加密)
        
        Args:
            service: 服务名称
            api_key: API 密钥
        """
        encrypted = self.cipher.encrypt(api_key.encode())
        
        key_file = self.config_dir / f'{service}.enc'
        with open(key_file, 'wb') as f:
            f.write(encrypted)
        
        key_file.chmod(0o600)
    
    def load_api_key(self, service: str) -> str:
        """
        加载 API 密钥(解密)
        
        Args:
            service: 服务名称
            
        Returns:
            API 密钥
        """
        key_file = self.config_dir / f'{service}.enc'
        
        if not key_file.exists():
            raise ValueError(f"未找到 {service} 的 API 密钥")
        
        with open(key_file, 'rb') as f:
            encrypted = f.read()
        
        return self.cipher.decrypt(encrypted).decode()

# 使用示例
config = SecureConfig()
config.save_api_key('anthropic', 'sk-ant-...')
api_key = config.load_api_key('anthropic')
输入验证
from typing import Any, Dict
import re

class InputValidator:
    """输入验证器"""
    
    @staticmethod
    def validate_file_path(path: str) -> bool:
        """
        验证文件路径
        
        Args:
            path: 文件路径
            
        Returns:
            是否有效
        """
        # 禁止路径遍历
        if '..' in path or path.startswith('/'):
            return False
        
        # 只允许特定扩展名
        allowed_extensions = ['.csv', '.xlsx', '.json', '.md']
        if not any(path.endswith(ext) for ext in allowed_extensions):
            return False
        
        return True
    
    @staticmethod
    def validate_command(command: str) -> bool:
        """
        验证命令
        
        Args:
            command: 命令字符串
            
        Returns:
            是否安全
        """
        # 禁止危险命令
        dangerous_commands = [
            'rm', 'del', 'format', 'dd',
            'sudo', 'chmod', 'chown'
        ]
        
        command_lower = command.lower()
        for dangerous in dangerous_commands:
            if dangerous in command_lower:
                return False
        
        return True
    
    @staticmethod
    def sanitize_input(data: str) -> str:
        """
        清理输入数据
        
        Args:
            data: 输入数据
            
        Returns:
            清理后的数据
        """
        # 移除潜在的注入代码
        data = re.sub(r'[<>\'\"\\]', '', data)
        
        # 限制长度
        max_length = 10000
        if len(data) > max_length:
            data = data[:max_length]
        
        return data

# 使用示例
validator = InputValidator()

if not validator.validate_file_path(user_input):
    raise ValueError("无效的文件路径")

if not validator.validate_command(command):
    raise ValueError("不安全的命令")

clean_data = validator.sanitize_input(user_input)

9.2 错误处理

统一错误处理
from enum import Enum
from typing import Optional
import traceback

class ErrorCode(Enum):
    """错误代码"""
    VALIDATION_ERROR = 1001
    API_ERROR = 1002
    SKILL_ERROR = 1003
    TIMEOUT_ERROR = 1004
    UNKNOWN_ERROR = 9999

class AgentError(Exception):
    """智能体错误基类"""
    
    def __init__(
        self,
        message: str,
        code: ErrorCode,
        details: Optional[Dict] = None
    ):
        """
        初始化错误
        
        Args:
            message: 错误消息
            code: 错误代码
            details: 错误详情
        """
        self.message = message
        self.code = code
        self.details = details or {}
        super().__init__(self.message)
    
    def to_dict(self) -> Dict:
        """转换为字典"""
        return {
            'error': {
                'message': self.message,
                'code': self.code.value,
                'details': self.details
            }
        }

class ErrorHandler:
    """错误处理器"""
    
    def __init__(self, logger):
        """
        初始化错误处理器
        
        Args:
            logger: 日志器
        """
        self.logger = logger
    
    def handle_error(self, error: Exception) -> Dict:
        """
        处理错误
        
        Args:
            error: 异常对象
            
        Returns:
            错误响应
        """
        # 记录错误
        self.logger.error(
            f"错误: {str(error)}\n"
            f"堆栈: {traceback.format_exc()}"
        )
        
        # 转换为标准错误响应
        if isinstance(error, AgentError):
            return error.to_dict()
        else:
            return {
                'error': {
                    'message': str(error),
                    'code': ErrorCode.UNKNOWN_ERROR.value,
                    'details': {}
                }
            }
    
    async def safe_execute(self, func, *args, **kwargs):
        """
        安全执行函数
        
        Args:
            func: 要执行的函数
            *args: 位置参数
            **kwargs: 关键字参数
            
        Returns:
            执行结果或错误响应
        """
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            return self.handle_error(e)

# 使用示例
error_handler = ErrorHandler(logger)

try:
    result = await agent.research(topic)
except Exception as e:
    error_response = error_handler.handle_error(e)
    return error_response

9.3 性能优化

缓存策略
from functools import lru_cache
import hashlib
import json
import pickle
from pathlib import Path
from typing import Any, Optional

class CacheManager:
    """缓存管理器"""
    
    def __init__(self, cache_dir: Path = None):
        """
        初始化缓存管理器
        
        Args:
            cache_dir: 缓存目录
        """
        self.cache_dir = cache_dir or Path('.cache')
        self.cache_dir.mkdir(exist_ok=True)
    
    def _compute_key(self, data: Any) -> str:
        """
        计算缓存键
        
        Args:
            data: 数据
            
        Returns:
            缓存键
        """
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()
    
    def get(self, key: str) -> Optional[Any]:
        """
        获取缓存
        
        Args:
            key: 缓存键
            
        Returns:
            缓存值或 None
        """
        cache_file = self.cache_dir / f"{key}.pkl"
        
        if cache_file.exists():
            try:
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
            except Exception as e:
                logger.warning(f"读取缓存失败: {str(e)}")
                return None
        
        return None
    
    def set(self, key: str, value: Any, ttl: int = 3600):
        """
        设置缓存
        
        Args:
            key: 缓存键
            value: 缓存值
            ttl: 过期时间(秒)
        """
        cache_file = self.cache_dir / f"{key}.pkl"
        
        try:
            with open(cache_file, 'wb') as f:
                pickle.dump({
                    'value': value,
                    'timestamp': time.time(),
                    'ttl': ttl
                }, f)
        except Exception as e:
            logger.warning(f"写入缓存失败: {str(e)}")
    
    def is_valid(self, key: str) -> bool:
        """
        检查缓存是否有效
        
        Args:
            key: 缓存键
            
        Returns:
            是否有效
        """
        cache_file = self.cache_dir / f"{key}.pkl"
        
        if not cache_file.exists():
            return False
        
        try:
            with open(cache_file, 'rb') as f:
                data = pickle.load(f)
            
            # 检查是否过期
            age = time.time() - data['timestamp']
            return age < data['ttl']
            
        except Exception:
            return False
    
    def clear(self):
        """清空缓存"""
        for cache_file in self.cache_dir.glob('*.pkl'):
            cache_file.unlink()

# 使用示例
cache = CacheManager()

async def cached_research(topic: str):
    """带缓存的研究函数"""
    
    # 计算缓存键
    cache_key = cache._compute_key({'topic': topic})
    
    # 检查缓存
    if cache.is_valid(cache_key):
        cached_result = cache.get(cache_key)
        if cached_result:
            logger.info(f"使用缓存结果: {topic}")
            return cached_result['value']
    
    # 执行研究
    result = await agent.research(topic)
    
    # 保存到缓存
    cache.set(cache_key, result, ttl=3600)
    
    return result
并发控制
import asyncio
from asyncio import Semaphore
from typing import List

class ConcurrencyController:
    """并发控制器"""
    
    def __init__(self, max_concurrent: int = 5):
        """
        初始化并发控制器
        
        Args:
            max_concurrent: 最大并发数
        """
        self.semaphore = Semaphore(max_concurrent)
    
    async def execute_with_limit(self, func, *args, **kwargs):
        """
        限制并发执行
        
        Args:
            func: 要执行的函数
            *args: 位置参数
            **kwargs: 关键字参数
            
        Returns:
            执行结果
        """
        async with self.semaphore:
            return await func(*args, **kwargs)
    
    async def batch_execute(
        self,
        tasks: List[tuple],
        max_concurrent: int = None
    ) -> List:
        """
        批量执行任务
        
        Args:
            tasks: 任务列表 [(func, args, kwargs), ...]
            max_concurrent: 最大并发数
            
        Returns:
            结果列表
        """
        if max_concurrent:
            self.semaphore = Semaphore(max_concurrent)
        
        coroutines = [
            self.execute_with_limit(func, *args, **kwargs)
            for func, args, kwargs in tasks
        ]
        
        return await asyncio.gather(*coroutines)

# 使用示例
controller = ConcurrencyController(max_concurrent=3)

# 批量研究多个主题
tasks = [
    (agent.research, ('React',), {}),
    (agent.research, ('Vue',), {}),
    (agent.research, ('Angular',), {})
]

results = await controller.batch_execute(tasks)

9.4 测试策略

单元测试
import unittest
from unittest.mock import Mock, patch, AsyncMock

class TestResearchAgent(unittest.TestCase):
    """研究智能体测试"""
    
    def setUp(self):
        """测试前准备"""
        self.agent = ResearchAgent()
    
    @patch('agent.Agent.chat')
    async def test_research_success(self, mock_chat):
        """测试:研究成功"""
        # 模拟 API 响应
        mock_chat.return_value = {
            'status': 'success',
            'content': 'Research result'
        }
        
        # 执行研究
        result = await self.agent.research('test-topic')
        
        # 验证结果
        self.assertEqual(result['status'], 'success')
        self.assertIn('content', result)
    
    async def test_research_with_invalid_topic(self):
        """测试:无效主题"""
        with self.assertRaises(ValueError):
            await self.agent.research('')
    
    @patch('agent.Agent.delegate_task')
    async def test_parallel_research(self, mock_delegate):
        """测试:并行研究"""
        # 模拟子智能体响应
        mock_delegate.return_value = {'status': 'success'}
        
        # 执行并行研究
        results = await self.agent._parallel_research('test-topic')
        
        # 验证调用次数
        self.assertEqual(mock_delegate.call_count, 3)

if __name__ == '__main__':
    unittest.main()
集成测试
import pytest

@pytest.mark.asyncio
async def test_end_to_end_research():
    """端到端测试:完整研究流程"""
    
    # 创建智能体
    agent = ResearchAgent()
    
    # 执行研究
    result = await agent.research('MinerU')
    
    # 验证结果
    assert result is not None
    assert 'README.md' in result
    assert 'resources.md' in result
    assert 'examples/' in result

@pytest.mark.asyncio
async def test_notion_integration():
    """集成测试:Notion 集成"""
    
    agent = ResearchAgent()
    
    # 准备测试数据
    content = "# Test Content\n\nThis is a test."
    
    # 交付到 Notion
    result = await agent._deliver_to_notion(content)
    
    # 验证结果
    assert result['status'] == 'success'

9.5 监控与告警

健康检查
from datetime import datetime
from typing import Dict

class HealthChecker:
    """健康检查器"""
    
    def __init__(self, agent):
        """
        初始化健康检查器
        
        Args:
            agent: 智能体实例
        """
        self.agent = agent
    
    async def check_health(self) -> Dict:
        """
        检查健康状态
        
        Returns:
            健康状态报告
        """
        health_status = {
            'timestamp': datetime.now().isoformat(),
            'status': 'healthy',
            'checks': {}
        }
        
        # 检查 API 连接
        try:
            await self.agent.agent.chat("ping")
            health_status['checks']['api'] = 'ok'
        except Exception as e:
            health_status['checks']['api'] = f'error: {str(e)}'
            health_status['status'] = 'unhealthy'
        
        # 检查 Skills 加载
        try:
            skills = self.agent.skill_tool.list_skills()
            health_status['checks']['skills'] = f'loaded: {len(skills)}'
        except Exception as e:
            health_status['checks']['skills'] = f'error: {str(e)}'
            health_status['status'] = 'degraded'
        
        # 检查 MCP 连接
        if self.agent.notion_token:
            try:
                # 测试 Notion 连接
                health_status['checks']['mcp'] = 'ok'
            except Exception as e:
                health_status['checks']['mcp'] = f'error: {str(e)}'
                health_status['status'] = 'degraded'
        
        return health_status

# 使用示例
checker = HealthChecker(agent)
health = await checker.check_health()

if health['status'] != 'healthy':
    logger.warning(f"健康检查失败: {health}")

10. 总结与展望

10.1 课程核心回顾

通过本系列学习笔记,我们完成了从理论到实践的完整旅程:

知识体系构建
Skills 知识体系:
┌─────────────────────────────────────────────────────────┐
│                    理论基础(Part 1-2)                  │
│  - Skills 核心概念                                       │
│  - 技术架构设计                                          │
│  - 与 MCP、Subagents 的协同                             │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│                    实战开发(Part 3)                    │
│  - 自定义 Skills 开发                                    │
│  - 跨平台集成应用                                        │
│  - 生产环境部署                                          │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│                    能力提升                              │
│  - 标准化业务流程                                        │
│  - 构建智能体系统                                        │
│  - 工程化落地能力                                        │
└─────────────────────────────────────────────────────────┘
从指令到资产

Skills 不是一次性的 Prompt,而是标准化的领域认知

  • 通过 YAML 元数据与 Markdown 正文的解耦
  • 将"业务逻辑"封装为可复用、可迁移的数字资产
  • 实现人类隐性知识的显性化
系统架构演进

智能体生态的三驾马车:

  • Skills:定义标准作业程序(SOP),是智能体的"技能树"
  • MCP:连接外部世界的实时脉搏,是智能体的"感知器"
  • Subagents:实现任务的并行与隔离,是智能体的"分身"

10.2 开发范式的跨越

课程带领我们跨越了三个维度:

GUI 时代:Claude Desktop
  • 通过 Pre-Built Skills 快速验证想法
  • 体验"开箱即用"的便利
  • 适合个人使用和快速原型
CLI 时代:Claude Code
  • 将编码与终端操作智能化
  • 在 CLAUDE.md 中沉淀项目规范
  • 实现人机协同的极致效率
SDK 时代:Claude Agent SDK
  • 进入纯代码领域
  • 以编程方式构建复杂的智能体系统
  • 实现真正的工程化落地

10.3 实战案例总结

营销活动分析 Skill
  • 效率提升:从 2-3 小时降至 5-10 分钟(95% ↓)
  • 错误率降低:从 5-10% 降至 <1%(90% ↓)
  • 业务价值:标准化流程、数据驱动决策
时间序列分析 Skill
  • 自动化诊断:趋势、季节性、异常检测
  • 可视化报告:自动生成图表和分析报告
  • 智能建议:基于数据特征提供优化建议
练习题生成 Skill
  • 多题型支持:选择题、填空题、简答题
  • 难度分级:自动调整题目难度
  • 格式输出:支持 Markdown 和 LaTeX

10.4 技术价值与行业意义

为什么 Skills 重要?

如果某项技术能将大模型的通用推理能力,低成本地转化为稳定可靠的业务交付能力,它就一定能成为行业标准。

Skills 填补了关键鸿沟

  • 从"通用智能"到"专业交付"
  • 从"探索性对话"到"标准化流程"
  • 从"一次性任务"到"可复用资产"
AI Agent 技术演进
技术演进路径:
单纯 Prompt 工程
    ↓
Tool Use(工具使用)
    ↓
MCP(模型上下文协议)
    ↓
Skills(标准化技能)
    ↓
Agentic Workflow(智能体工作流)

10.5 最佳实践总结

开发原则
  1. 标准化优先

    • 将重复性任务封装为 Skills
    • 定义清晰的输入输出规范
    • 建立统一的错误处理机制
  2. 安全第一

    • 实施最小权限原则
    • 加密存储敏感信息
    • 验证所有用户输入
  3. 性能优化

    • 使用缓存减少重复计算
    • 实现并发控制提高效率
    • 监控性能指标持续优化
  4. 可维护性

    • 编写清晰的文档
    • 实施版本控制
    • 建立完善的测试体系
工程化思维

学习 Agent Skills 时,不应局限于语法本身,而应聚焦于背后的工程化思维

  • 抽象能力:如何将非结构化的业务经验抽象为结构化的 SOP
  • 编排能力:如何通过编排让多个智能体像流水线一样协作
  • 集成能力:如何利用 MCP 打通数据孤岛
  • 演进能力:如何适应模型能力的持续进化

10.6 未来展望

技术演进方向
  1. 自动化程度提升

    • Agent 通过观察自我习得技能
    • 减少手动编写定义文件的需求
    • 更智能的意图识别与规划
  2. 生态系统完善

    • 更丰富的 Pre-Built Skills 库
    • 标准化的 Skills 市场
    • 跨平台的无缝集成
  3. 能力边界扩展

    • 更复杂的多智能体协作
    • 更强大的推理和规划能力
    • 更广泛的领域应用
持续学习建议
  1. 保持技术敏感度

    • 关注模型原生能力的进化
    • 跟踪 Agent 框架的更新
    • 参与社区讨论和实践
  2. 深化工程能力

    • 掌握复杂系统设计
    • 提升代码质量和可维护性
    • 积累生产环境经验
  3. 拓展应用场景

    • 探索新的业务领域
    • 尝试创新的解决方案
    • 分享实践经验和最佳实践

附录

A. 常用资源

官方文档
社区资源
学习资源

B. 常见问题

Q1: Skills 和 Prompts 有什么区别?

A: Skills 是结构化的、可复用的标准流程,而 Prompts 是一次性的指令。Skills 包含元数据、文档、脚本等完整资源。

Q2: 如何选择合适的部署平台?

A:

  • 个人使用 → Claude Desktop
  • 开发辅助 → Claude Code
  • 生产部署 → Claude Agent SDK
Q3: Skills 可以跨平台使用吗?

A: 是的,Skills 遵循开放标准,可以在 Claude Desktop、Claude Code 和 Claude Agent SDK 之间无缝迁移。

Q4: 如何保证 Agent 的安全性?

A:

  • 实施最小权限原则
  • 加密存储敏感信息
  • 验证所有输入
  • 实现人机回环确认机制
Q5: Skills 的性能如何优化?

A:

  • 使用缓存减少重复计算
  • 实现并发控制
  • 优化 Prompt 长度
  • 使用渐进式披露

C. 术语表

术语 英文 说明
技能 Skills 标准化的、可复用的智能体能力单元
工具 Tools 智能体可以调用的外部功能
模型上下文协议 MCP 连接 AI 模型与外部数据源的标准协议
子智能体 Subagents 执行特定任务的独立智能体实例
编排 Orchestration 协调多个智能体协同工作的过程
渐进式披露 Progressive Disclosure 按需加载信息以节省上下文窗口
人机回环 Human-in-the-loop 在关键决策点引入人工确认的机制

作者:抱逸斯
日期:2026 年 2 月 19 日
版本:1.0.0

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐