"如果说单个 AI Agent 是一位专家,那么 Workflow 就是让这些专家协同作战的指挥官。"

引言:从单打独斗到团队协作

还记得你第一次使用 ChatGPT 的感觉吗?那种"哇,AI 真的能理解我"的惊喜。但很快你就会发现,当任务变得复杂时,单个 AI 对话就像是让一个人同时扮演厨师、服务员和收银员——理论上可行,实际上一团糟。

这就是为什么微软推出了 Agent Framework,而其中的 Workflow 功能,正是解决这个问题的杀手锏。今天,我们就来深入探讨这个让 AI Agent 从"单兵作战"升级为"特种部队"的技术。

一、Workflow 是什么?为什么我们需要它?

1.1 现实世界的痛点

想象一个场景:你需要构建一个智能客服系统,它要能够:

  • 理解用户的问题(自然语言处理)

  • 查询订单数据库(数据检索)

  • 判断是否需要人工介入(决策逻辑)

  • 生成友好的回复(内容生成)

如果用传统的单一 AI Agent 来处理,你会遇到什么问题?

  1. 上下文混乱:一个 Agent 要记住所有状态,容易"精神分裂"

  2. 职责不清:什么都做,往往什么都做不好

  3. 难以扩展:新增功能就像在意大利面代码里加调料

  4. 无法并行:明明可以同时做的事,却要排队等待

1.2 Workflow 的核心理念

Microsoft Agent Framework 的 Workflow 采用了一个优雅的设计哲学:

Workflow = Executors(执行器)+ Edges(边)+ Orchestration(编排)

这就像搭积木:

  • Executor:每个积木块,专注做一件事

  • Edge:积木块之间的连接方式

  • Workflow:最终搭建出的复杂结构

让我们看一个最简单的例子:

// 创建两个执行器
UppercaseExecutor uppercase = new();
ReverseTextExecutor reverse = new();

// 用边连接它们
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse)
       .WithOutputFrom(reverse);
       
var workflow = builder.Build();

// 执行:输入 "Hello" → 转大写 "HELLO" → 反转 "OLLEH"
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello");

看到了吗?每个 Executor 只做一件事,但通过 Edge 连接后,就能完成复杂的任务。这就是组合的力量

二、Workflow 的核心组件深度剖析

2.1 Executor:工作流的基本单元

Executor 是 Workflow 的"原子",它的设计哲学是:单一职责,明确输入输出

2.1.1 Executor 的三种形态

形态一:无返回值的 Executor

internal sealed class LoggerExecutor() : Executor<string>("Logger")
{
    public override ValueTask HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"[LOG] {message}");
        return default; // 不返回任何值
    }
}

形态二:有返回值的 Executor

internal sealed class UppercaseExecutor() : Executor<string, string>("Uppercase")
{
    public override ValueTask<string> HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult(message.ToUpperInvariant());
    }
}

形态三:AI Agent 作为 Executor

这是最有趣的部分!Agent Framework 允许你把任何 AI Agent 包装成 Executor:

var chatClient = new AzureOpenAIClient(endpoint, credential)
    .GetChatClient(deploymentName)
    .AsIChatClient();

// 创建一个翻译 Agent
AIAgent frenchAgent = new ChatClientAgent(
    chatClient,
    "You are a translation assistant that translates to French."
);

// Agent 可以直接用在 Workflow 中!
var workflow = new WorkflowBuilder(frenchAgent)
    .AddEdge(frenchAgent, spanishAgent)
    .AddEdge(spanishAgent, englishAgent)
    .Build();
2.1.2 Executor 的生命周期

每个 Executor 都有完整的生命周期管理:

protected internal virtual ValueTask InitializeAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 初始化资源、加载配置等
}

protected internal virtual ValueTask OnCheckpointingAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 保存检查点前的准备工作
}

protected internal virtual ValueTask OnCheckpointRestoredAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 从检查点恢复后的初始化
}

这种设计让 Executor 不仅仅是一个函数,而是一个有状态、可管理的组件

2.2 Edge:数据流动的艺术

如果说 Executor 是节点,那么 Edge 就是连接这些节点的"血管"。Agent Framework 提供了三种 Edge 类型:

2.2.1 Direct Edge:一对一连接

最简单的边,数据从一个 Executor 流向另一个:

builder.AddEdge(sourceExecutor, targetExecutor);

但这还不够强大,我们可以加上条件判断

builder.AddEdge<int>(
    source, 
    target, 
    condition: value => value > 0  // 只有正数才会传递
);

这就像给数据流加上了"阀门",只有满足条件的数据才能通过。

2.2.2 Fan-Out Edge:一对多广播

想象一个场景:你需要同时咨询三个专家的意见。Fan-Out 就是为此而生:

ChatClientAgent physicist = new(client, "Physics expert");
ChatClientAgent chemist = new(client, "Chemistry expert");
ChatClientAgent biologist = new(client, "Biology expert");

builder.AddFanOutEdge(
    startExecutor, 
    targets: [physicist, chemist, biologist]
);

所有专家会并行处理同一个问题,大大提升效率!

你还可以自定义分发策略:

builder.AddFanOutEdge<string>(
    source,
    partitioner: (message, count) => 
    {
        // 根据消息内容决定发给哪些 Executor
        if (message.Contains("urgent"))
            return [0, 1]; // 发给前两个
        else
            return [2];    // 只发给第三个
    },
    targets: [executor1, executor2, executor3]
);
2.2.3 Fan-In Edge:多对一汇聚

有了 Fan-Out,自然需要 Fan-In 来收集结果:

var aggregator = new ResultAggregatorExecutor();

builder.AddFanInEdge(
    aggregator, 
    sources: [physicist, chemist, biologist]
);

Aggregator 会等待所有源 Executor 完成后,再统一处理结果。这就是经典的 Map-Reduce 模式!

三、Workflow 的高级模式:从理论到实战

3.1 Sequential Pattern:流水线模式

这是最直观的模式,就像工厂流水线一样,每个环节依次处理:

var workflow = AgentWorkflowBuilder.BuildSequential(
    "Translation Pipeline",
    GetTranslationAgent("French", client),
    GetTranslationAgent("Spanish", client),
    GetTranslationAgent("English", client)
);

// 输入:"Hello" 
// → 法语:"Bonjour" 
// → 西班牙语:"Hola" 
// → 英语:"Hello"

适用场景

  • 数据处理管道(清洗 → 转换 → 验证)

  • 多步骤审批流程

  • 渐进式内容生成

性能特点

  • 延迟:累加(每个步骤的延迟相加)

  • 吞吐量:受限于最慢的环节

  • 资源占用:低(同一时间只有一个 Executor 在工作)

3.2 Concurrent Pattern:并行处理模式

当多个任务互不依赖时,为什么要让它们排队?并行处理才是王道:

var workflow = AgentWorkflowBuilder.BuildConcurrent(
    "Multi-Expert Consultation",
    agents: [physicistAgent, chemistAgent, biologistAgent],
    aggregator: results => 
    {
        // 自定义聚合逻辑
        return results
            .SelectMany(list => list)
            .OrderBy(msg => msg.Timestamp)
            .ToList();
    }
);

实战案例:智能文档分析系统

假设你要分析一份技术文档,需要:

  1. 提取关键词(NLP Agent)

  2. 生成摘要(Summarization Agent)

  3. 检测技术栈(Tech Stack Detector Agent)

这三个任务完全独立,用并行模式可以将处理时间从 30 秒降到 10 秒!

var nlpAgent = new ChatClientAgent(client, "Extract keywords");
var summaryAgent = new ChatClientAgent(client, "Generate summary");
var techAgent = new ChatClientAgent(client, "Detect tech stack");

var workflow = AgentWorkflowBuilder.BuildConcurrent(
    "Document Analyzer",
    [nlpAgent, summaryAgent, techAgent]
);

await using var run = await InProcessExecution.StreamAsync(
    workflow, 
    new ChatMessage(ChatRole.User, documentContent)
);

性能对比

模式 总耗时 CPU 利用率 适用场景
Sequential 30s 33% 任务有依赖关系
Concurrent 10s 90% 任务相互独立

3.3 Handoffs Pattern:智能转接模式

这是最接近真实世界的模式。想象一个客服中心:

  • 前台接待(Triage Agent)判断问题类型

  • 技术支持(Tech Agent)处理技术问题

  • 销售顾问(Sales Agent)处理购买咨询

  • 必要时可以转回前台

var triageAgent = new ChatClientAgent(
    client,
    "Determine which specialist to route to. ALWAYS handoff.",
    "triage_agent"
);

var techAgent = new ChatClientAgent(
    client,
    "Handle technical support questions only.",
    "tech_support"
);

var salesAgent = new ChatClientAgent(
    client,
    "Handle sales and pricing questions only.",
    "sales_agent"
);

var workflow = AgentWorkflowBuilder
    .CreateHandoffBuilderWith(triageAgent)
    .WithHandoffs(triageAgent, [techAgent, salesAgent])
    .WithHandoffs([techAgent, salesAgent], triageAgent)
    .Build();

Handoff 的魔法:AI Tool Calling

Handoff 是如何实现的?答案是 AI Function Calling

当你配置 Handoff 时,Framework 会自动为每个目标 Agent 生成一个 Tool:

// 自动生成的 Tool
{
    "name": "handoff_to_tech_support",
    "description": "Handle technical support questions only.",
    "parameters": { ... }
}

当 Triage Agent 判断需要技术支持时,它会"调用"这个 Tool,Framework 拦截这个调用,将对话转交给 Tech Agent。整个过程对用户来说是无缝的!

3.4 Group Chat Pattern:圆桌会议模式

如果说 Handoffs 是"点对点转接",那么 Group Chat 就是"所有人都在一个会议室":

var workflow = AgentWorkflowBuilder
    .CreateGroupChatBuilderWith(agents => 
        new RoundRobinGroupChatManager(agents) 
        { 
            MaximumIterationCount = 5 
        })
    .AddParticipants(
        GetTranslationAgent("French", client),
        GetTranslationAgent("Spanish", client),
        GetTranslationAgent("English", client)
    )
    .Build();

Group Chat Manager 的职责

  1. 决定下一个发言的 Agent

  2. 控制对话轮次

  3. 判断何时结束讨论

你可以实现自定义的 Manager:

public class SmartGroupChatManager : GroupChatManager
{
    protected override async ValueTask<AIAgent?> SelectNextAgentAsync(
        IReadOnlyList<ChatMessage> history,
        CancellationToken cancellationToken)
    {
        // 使用 AI 来决定下一个发言者
        var decision = await _decisionAgent.RunAsync(
            $"Based on the conversation, who should speak next? {string.Join(", ", _agents.Select(a => a.Name))}"
        );
        
        return _agents.FirstOrDefault(a => 
            decision.Contains(a.Name, StringComparison.OrdinalIgnoreCase)
        );
    }
}

四、Workflow 的状态管理:时间旅行不是梦

4.1 Checkpoint:工作流的"存档点"

想象你在玩一个复杂的 RPG 游戏,突然断电了。如果没有存档,你得从头开始。Workflow 的 Checkpoint 就是这样的"存档系统"。

var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();

await using Checkpointed<StreamingRun> run = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager);

await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{
    if (evt is SuperStepCompletedEvent superStepEvt)
    {
        // 每个 Super Step 完成后自动创建 Checkpoint
        CheckpointInfo? checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
        if (checkpoint is not null)
        {
            checkpoints.Add(checkpoint);
            Console.WriteLine($"Checkpoint {checkpoints.Count} saved!");
        }
    }
}

Super Step 是什么?

Workflow 的执行不是连续的,而是分成一个个"超级步骤":

Super Step 1: [Executor A] 完成
Super Step 2: [Executor B, Executor C] 并行完成
Super Step 3: [Executor D] 完成

每个 Super Step 结束后,系统会自动保存状态。这样设计的好处:

  1. 粒度合适:不会太频繁(每条消息一个),也不会太稀疏

  2. 状态一致:Super Step 内的所有 Executor 要么全部完成,要么全部未完成

  3. 易于恢复:从任何 Checkpoint 恢复都能保证状态一致

4.2 时间旅行:回到过去

有了 Checkpoint,你可以做一些"魔法操作":

// 恢复到第 5 个 Checkpoint
CheckpointInfo savedCheckpoint = checkpoints[4];
await run.RestoreCheckpointAsync(savedCheckpoint);

// 从这个点继续执行
await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{
    // 继续处理...
}

实战场景:A/B 测试

假设你有一个复杂的 Workflow,前面 5 个步骤都一样,但第 6 步有两种不同的策略。你可以:

  1. 执行到第 5 步,保存 Checkpoint

  2. 用策略 A 完成剩余步骤,记录结果

  3. 恢复到第 5 步的 Checkpoint

  4. 用策略 B 完成剩余步骤,记录结果

  5. 对比两种策略的效果

这在传统编程中需要复杂的状态管理,但在 Workflow 中只需要几行代码!

4.3 Shared State:Executor 之间的"共享内存"

有时候,多个 Executor 需要共享一些数据。比如:

  • 累加器(统计处理了多少条数据)

  • 缓存(避免重复计算)

  • 配置(所有 Executor 共享的参数)

public class CounterExecutor : StatefulExecutor<string>
{
    private int _count = 0;

    public CounterExecutor() : base("Counter", 
        new StatefulExecutorOptions 
        { 
            StateScope = StateScope.Workflow  // 工作流级别的状态
        })
    { }

    public override async ValueTask HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        _count++;
        Console.WriteLine($"Processed {_count} messages");
        
        // 状态会自动保存到 Checkpoint
        await context.SendMessageAsync($"Count: {_count}");
    }
}

状态作用域

作用域 生命周期 适用场景
Executor 单个 Executor 实例 临时缓存
Run 单次 Workflow 执行 会话状态
Workflow 整个 Workflow 全局配置

五、人机协作:Human-in-the-Loop 模式

5.1 为什么需要人类介入?

AI 再强大,也有它的局限性:

  • 关键决策:涉及法律、伦理的决定需要人类审核

  • 异常处理:AI 遇到边界情况时需要人类指导

  • 质量把关:重要内容发布前需要人工审查

Agent Framework 提供了优雅的 Request Port 机制来实现人机协作。

5.2 Request Port:工作流的"暂停按钮"

Request Port 就像是工作流中的一个"等待点",当执行到这里时,会暂停并向外部请求输入:

// 创建一个 Request Port
var humanInputPort = RequestPort.Create<NumberSignal, int>("HumanInput");

// 在 Workflow 中使用
var workflow = new WorkflowBuilder(judgeExecutor)
    .AddEdge(judgeExecutor, humanInputPort)
    .AddEdge(humanInputPort, judgeExecutor)
    .Build();

实战案例:数字猜谜游戏

让我们看一个完整的例子:

public class JudgeExecutor : Executor<int, NumberSignal>
{
    private const int TargetNumber = 42;

    public override ValueTask<NumberSignal> HandleAsync(
        int guess, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        if (guess == TargetNumber)
        {
            await context.YieldOutputAsync("Correct! You win!");
            return ValueTask.FromResult(NumberSignal.Correct);
        }
        else if (guess > TargetNumber)
        {
            return ValueTask.FromResult(NumberSignal.Above);
        }
        else
        {
            return ValueTask.FromResult(NumberSignal.Below);
        }
    }
}

// 执行 Workflow
await using StreamingRun run = await InProcessExecution.StreamAsync(
    workflow, 
    NumberSignal.Init
);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent requestEvt)
    {
        // Workflow 请求人类输入
        Console.Write("Enter your guess: ");
        int guess = int.Parse(Console.ReadLine()!);
        
        // 发送响应
        await run.SendResponseAsync(
            requestEvt.Request.CreateResponse(guess)
        );
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine(outputEvt.Data);
        break;
    }
}

交互流程

1. Workflow 启动 → JudgeExecutor 发送 Init 信号
2. 到达 Request Port → 触发 RequestInfoEvent
3. 外部程序接收事件 → 提示用户输入
4. 用户输入 50 → 发送 ExternalResponse
5. JudgeExecutor 判断 → 返回 Above 信号
6. 循环直到猜对...

5.3 高级场景:审批流程

在企业应用中,审批流程是典型的人机协作场景:

public class ApprovalWorkflow
{
    public static Workflow Build()
    {
        var documentProcessor = new DocumentProcessorExecutor();
        var aiReviewer = new AIReviewerAgent(client);
        var humanApprovalPort = RequestPort.Create<ReviewResult, ApprovalDecision>("HumanApproval");
        var publisher = new PublisherExecutor();

        return new WorkflowBuilder(documentProcessor)
            .AddEdge(documentProcessor, aiReviewer)
            .AddEdge<ReviewResult>(
                aiReviewer, 
                humanApprovalPort,
                condition: result => result.Confidence < 0.9  // 置信度低时需要人工审核
            )
            .AddEdge<ReviewResult>(
                aiReviewer,
                publisher,
                condition: result => result.Confidence >= 0.9  // 置信度高时直接发布
            )
            .AddEdge(humanApprovalPort, publisher)
            .Build();
    }
}

这个 Workflow 实现了"智能分流":

  • AI 有把握的内容(置信度 ≥ 0.9):自动发布

  • AI 不确定的内容(置信度 < 0.9):人工审核

这样既保证了质量,又提高了效率!

六、声明式 Workflow:低代码的未来

6.1 从命令式到声明式

前面我们看到的都是命令式的 Workflow 定义:

var builder = new WorkflowBuilder(start);
builder.AddEdge(start, middle);
builder.AddEdge(middle, end);
var workflow = builder.Build();

但对于非开发人员,这还是太复杂了。Agent Framework 提供了声明式 Workflow,用 JSON 配置就能定义复杂的流程!

6.2 声明式 Workflow 示例

{
  "name": "CustomerServiceWorkflow",
  "description": "Intelligent customer service automation",
  "actions": [
    {
      "id": "start",
      "type": "Question",
      "properties": {
        "prompt": "How can I help you today?"
      }
    },
    {
      "id": "classify",
      "type": "InvokeAzureAgent",
      "properties": {
        "agentId": "classifier-agent",
        "input": "{{start.response}}"
      }
    },
    {
      "id": "route",
      "type": "ConditionGroup",
      "conditions": [
        {
          "condition": "{{classify.category}} == 'technical'",
          "next": "tech-support"
        },
        {
          "condition": "{{classify.category}} == 'billing'",
          "next": "billing-agent"
        }
      ]
    },
    {
      "id": "tech-support",
      "type": "InvokeAzureAgent",
      "properties": {
        "agentId": "tech-support-agent"
      }
    },
    {
      "id": "billing-agent",
      "type": "InvokeAzureAgent",
      "properties": {
        "agentId": "billing-agent"
      }
    }
  ]
}

6.3 声明式 Workflow 的优势

1. 可视化设计

配合可视化工具,业务人员可以像画流程图一样设计 Workflow:

[用户提问] → [AI分类] → [条件判断]
                            ├─ 技术问题 → [技术支持Agent]
                            └─ 账单问题 → [账单Agent]

2. 动态加载

声明式 Workflow 可以在运行时加载和修改:

var workflowJson = await File.ReadAllTextAsync("workflow.json");
var workflow = await DeclarativeWorkflowBuilder.BuildFromJsonAsync(workflowJson);

// 无需重新编译,直接执行新的 Workflow
await InProcessExecution.RunAsync(workflow, input);

3. 版本管理

Workflow 定义存储为 JSON,可以轻松进行版本控制:

git diff workflow-v1.json workflow-v2.json

4. 跨平台共享

同一个 Workflow 定义可以在不同的平台上执行:

  • .NET 应用

  • Azure Functions

  • Power Automate(未来可能支持)

6.4 声明式 Workflow 的内置 Actions

Agent Framework 提供了丰富的内置 Actions:

状态管理类

  • SetVariable:设置变量

  • SetMultipleVariables:批量设置变量

  • ClearAllVariables:清空所有变量

对话管理类

  • CreateConversation:创建新对话

  • AddConversationMessage:添加消息

  • RetrieveConversationMessages:检索历史消息

控制流类

  • ConditionGroup:条件分支

  • Foreach:循环遍历

  • GotoAction:跳转到指定步骤

AI 集成类

  • InvokeAzureAgent:调用 Azure AI Agent

  • Question:请求用户输入

数据处理类

  • ParseValue:解析数据

  • EditTableV2:编辑表格数据

这些 Actions 可以像乐高积木一样组合,构建出复杂的业务逻辑!

七、性能优化与最佳实践

7.1 并发控制:不是越多越好

虽然并发能提升性能,但也要注意资源限制:

// ❌ 不好的做法:无限制并发
var workflow = new WorkflowBuilder(start);
for (int i = 0; i < 1000; i++)
{
    var agent = new ChatClientAgent(client, $"Agent {i}");
    builder.AddFanOutEdge(start, agent);
}

// ✅ 好的做法:分批处理
var batchSize = 10;
var batches = agents.Chunk(batchSize);

foreach (var batch in batches)
{
    var batchWorkflow = AgentWorkflowBuilder.BuildConcurrent(batch);
    // 执行批次...
}

7.2 状态管理:选择合适的作用域

// ❌ 不好的做法:所有状态都用 Workflow 级别
public class MyExecutor : StatefulExecutor<string>
{
    public MyExecutor() : base("MyExecutor", 
        new StatefulExecutorOptions 
        { 
            StateScope = StateScope.Workflow  // 会一直占用内存
        })
    { }
}

// ✅ 好的做法:根据需要选择作用域
public class MyExecutor : StatefulExecutor<string>
{
    public MyExecutor() : base("MyExecutor", 
        new StatefulExecutorOptions 
        { 
            StateScope = StateScope.Run  // 执行完成后自动清理
        })
    { }
}

7.3 Checkpoint 策略:平衡性能与可靠性

// 策略 1:每个 Super Step 都保存(默认)
// 优点:恢复粒度细
// 缺点:I/O 开销大

// 策略 2:只在关键节点保存
var checkpointManager = new SelectiveCheckpointManager(
    shouldCheckpoint: (stepInfo) => 
        stepInfo.ExecutorIds.Any(id => id.StartsWith("Critical"))
);

// 策略 3:定时保存
var checkpointManager = new TimedCheckpointManager(
    interval: TimeSpan.FromMinutes(5)
);

7.4 错误处理:优雅降级

public class ResilientExecutor : Executor<string, string>
{
    private readonly int _maxRetries = 3;

    public override async ValueTask<string> HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        for (int i = 0; i < _maxRetries; i++)
        {
            try
            {
                return await ProcessAsync(message, cancellationToken);
            }
            catch (Exception ex) when (i < _maxRetries - 1)
            {
                await context.AddEventAsync(
                    new WorkflowWarningEvent($"Retry {i + 1}/{_maxRetries}: {ex.Message}"),
                    cancellationToken
                );
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)), cancellationToken);
            }
        }
        
        // 最后一次失败,返回降级结果
        return "Service temporarily unavailable. Please try again later.";
    }
}

八、实战案例:构建智能文档处理系统

让我们通过一个完整的案例,把前面学到的知识串起来。

8.1 需求分析

假设我们要构建一个智能文档处理系统,功能包括:

  1. 文档上传:用户上传 PDF/Word 文档

  2. 内容提取:提取文本、图片、表格

  3. 多维度分析
    • 情感分析(正面/负面/中性)

    • 关键词提取

    • 摘要生成

    • 技术栈识别(如果是技术文档)

  4. 质量评估:AI 评估文档质量

  5. 人工审核:质量低于阈值时需要人工审核

  6. 结果输出:生成分析报告

8.2 架构设计

[文档上传] 
    ↓
[内容提取Executor]
    ↓
[Fan-Out: 并行分析]
    ├─ [情感分析Agent]
    ├─ [关键词提取Agent]
    ├─ [摘要生成Agent]
    └─ [技术栈识别Agent]
    ↓
[Fan-In: 结果汇总Executor]
    ↓
[质量评估Agent]
    ↓
[条件分支]
    ├─ 质量高 → [自动发布Executor]
    └─ 质量低 → [人工审核Port] → [发布Executor]

8.3 代码实现

Step 1: 定义 Executors

// 内容提取 Executor
public class ContentExtractorExecutor : Executor<DocumentUpload, ExtractedContent>
{
    public ContentExtractorExecutor() : base("ContentExtractor") { }

    public override async ValueTask<ExtractedContent> HandleAsync(
        DocumentUpload upload, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 使用 PDF 库提取内容
        var content = await ExtractContentFromDocument(upload.FilePath);
        
        await context.AddEventAsync(
            new WorkflowOutputEvent($"Extracted {content.Text.Length} characters"),
            cancellationToken
        );
        
        return content;
    }
}

// 结果汇总 Executor
public class ResultAggregatorExecutor : Executor<AnalysisResult>
{
    private readonly List<AnalysisResult> _results = new();
    private const int ExpectedResultCount = 4; // 4 个分析 Agent

    public ResultAggregatorExecutor() : base("ResultAggregator") { }

    public override async ValueTask HandleAsync(
        AnalysisResult result, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        _results.Add(result);

        if (_results.Count == ExpectedResultCount)
        {
            var aggregated = new AggregatedAnalysis
            {
                Sentiment = _results.First(r => r.Type == "Sentiment").Value,
                Keywords = _results.First(r => r.Type == "Keywords").Value,
                Summary = _results.First(r => r.Type == "Summary").Value,
                TechStack = _results.First(r => r.Type == "TechStack").Value
            };

            await context.SendMessageAsync(aggregated, cancellationToken: cancellationToken);
        }
    }
}

// 发布 Executor
public class PublisherExecutor : Executor<PublishRequest, PublishResult>
{
    public PublisherExecutor() : base("Publisher") { }

    public override async ValueTask<PublishResult> HandleAsync(
        PublishRequest request, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 发布到数据库或文件系统
        var result = await PublishToStorage(request);
        
        await context.YieldOutputAsync(result, cancellationToken);
        return result;
    }
}

Step 2: 创建 AI Agents

public class DocumentAnalysisAgents
{
    private readonly IChatClient _client;

    public DocumentAnalysisAgents(IChatClient client)
    {
        _client = client;
    }

    public ChatClientAgent CreateSentimentAgent() => new(
        _client,
        """
        You are a sentiment analysis expert. Analyze the provided text and return:
        - Overall sentiment (Positive/Negative/Neutral)
        - Confidence score (0-1)
        - Key phrases that influenced the sentiment
        
        Return your analysis in JSON format.
        """,
        "sentiment_analyzer"
    );

    public ChatClientAgent CreateKeywordAgent() => new(
        _client,
        """
        You are a keyword extraction expert. Extract the most important keywords and phrases.
        Return top 10 keywords ranked by importance in JSON format.
        """,
        "keyword_extractor"
    );

    public ChatClientAgent CreateSummaryAgent() => new(
        _client,
        """
        You are a summarization expert. Create a concise summary (max 200 words) 
        that captures the main points of the document.
        """,
        "summarizer"
    );

    public ChatClientAgent CreateTechStackAgent() => new(
        _client,
        """
        You are a technology stack detector. Identify programming languages, frameworks,
        databases, and tools mentioned in the document. Return as a structured list.
        """,
        "tech_detector"
    );

    public ChatClientAgent CreateQualityAgent() => new(
        _client,
        """
        You are a document quality assessor. Evaluate the document based on:
        - Clarity and coherence
        - Technical accuracy
        - Completeness
        - Professional writing quality
        
        Return a quality score (0-100) and detailed feedback in JSON format.
        """,
        "quality_assessor"
    );
}

Step 3: 构建 Workflow

public class DocumentProcessingWorkflow
{
    public static Workflow Build(IChatClient client)
    {
        var agents = new DocumentAnalysisAgents(client);

        // 创建 Executors
        var extractor = new ContentExtractorExecutor();
        var sentimentAgent = agents.CreateSentimentAgent();
        var keywordAgent = agents.CreateKeywordAgent();
        var summaryAgent = agents.CreateSummaryAgent();
        var techAgent = agents.CreateTechStackAgent();
        var aggregator = new ResultAggregatorExecutor();
        var qualityAgent = agents.CreateQualityAgent();
        var humanReviewPort = RequestPort.Create<QualityReport, ReviewDecision>("HumanReview");
        var publisher = new PublisherExecutor();

        // 构建 Workflow
        var builder = new WorkflowBuilder(extractor);

        // Fan-Out: 并行分析
        builder.AddFanOutEdge(
            extractor,
            targets: [sentimentAgent, keywordAgent, summaryAgent, techAgent]
        );

        // 所有分析结果汇总
        builder.AddFanInEdge(
            aggregator,
            sources: [sentimentAgent, keywordAgent, summaryAgent, techAgent]
        );

        // 质量评估
        builder.AddEdge(aggregator, qualityAgent);

        // 条件分支:高质量直接发布,低质量需要人工审核
        builder.AddEdge<QualityReport>(
            qualityAgent,
            publisher,
            condition: report => report.Score >= 80
        );

        builder.AddEdge<QualityReport>(
            qualityAgent,
            humanReviewPort,
            condition: report => report.Score < 80
        );

        builder.AddEdge(humanReviewPort, publisher);

        return builder
            .WithName("Intelligent Document Processing")
            .WithDescription("Automated document analysis with human-in-the-loop quality control")
            .WithOutputFrom(publisher)
            .Build();
    }
}

Step 4: 执行 Workflow

public class DocumentProcessingService
{
    private readonly Workflow _workflow;
    private readonly CheckpointManager _checkpointManager;

    public DocumentProcessingService(IChatClient client)
    {
        _workflow = DocumentProcessingWorkflow.Build(client);
        _checkpointManager = CheckpointManager.Default;
    }

    public async Task<PublishResult> ProcessDocumentAsync(
        string filePath,
        CancellationToken cancellationToken = default)
    {
        var upload = new DocumentUpload { FilePath = filePath };

        await using var run = await InProcessExecution.StreamAsync(
            _workflow,
            upload,
            _checkpointManager
        );

        PublishResult? result = null;

        await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
        {
            switch (evt)
            {
                case ExecutorCompletedEvent completed:
                    Console.WriteLine($"✓ {completed.ExecutorId} completed");
                    break;

                case SuperStepCompletedEvent superStep:
                    Console.WriteLine($"⚡ Super Step {superStep.CompletionInfo!.SuperStepId} completed");
                    if (superStep.CompletionInfo.Checkpoint is not null)
                    {
                        Console.WriteLine($"💾 Checkpoint saved");
                    }
                    break;

                case RequestInfoEvent request:
                    // 需要人工审核
                    var decision = await RequestHumanReviewAsync(
                        request.Request.DataAs<QualityReport>()
                    );
                    await run.Run.SendResponseAsync(
                        request.Request.CreateResponse(decision)
                    );
                    break;

                case WorkflowOutputEvent output:
                    result = output.As<PublishResult>();
                    Console.WriteLine($"✅ Document published: {result!.Url}");
                    break;

                case WorkflowErrorEvent error:
                    Console.WriteLine($"❌ Error: {error.Message}");
                    throw new Exception(error.Message);
            }
        }

        return result ?? throw new InvalidOperationException("Workflow did not produce output");
    }

    private async Task<ReviewDecision> RequestHumanReviewAsync(QualityReport report)
    {
        Console.WriteLine($"\n⚠️  Human review required!");
        Console.WriteLine($"Quality Score: {report.Score}/100");
        Console.WriteLine($"Feedback: {report.Feedback}");
        Console.Write("Approve? (y/n): ");

        var input = Console.ReadLine();
        return new ReviewDecision
        {
            Approved = input?.ToLower() == "y",
            ReviewerComments = input?.ToLower() == "y" 
                ? "Approved by reviewer" 
                : "Rejected - needs improvement"
        };
    }
}

8.4 性能分析

让我们对比一下不同实现方式的性能:

方案 1:顺序执行

内容提取(2s) → 情感分析(3s) → 关键词提取(3s) → 摘要生成(4s) → 技术栈识别(3s) → 质量评估(2s)
总耗时:17 秒

方案 2:并行分析(我们的实现)

内容提取(2s) → [并行: 情感(3s), 关键词(3s), 摘要(4s), 技术栈(3s)] → 质量评估(2s)
总耗时:8 秒 (节省 53%)

方案 3:完全并行(包括质量评估)

内容提取(2s) → [并行: 所有分析 + 质量评估]
总耗时:6 秒 (节省 65%)
但可能导致质量评估不准确,因为它需要完整的分析结果

8.5 扩展性考虑

这个系统可以轻松扩展:

1. 添加新的分析维度

var plagiarismAgent = new ChatClientAgent(client, "Detect plagiarism");
builder.AddFanOutEdge(extractor, [/* existing agents */, plagiarismAgent]);
builder.AddFanInEdge(aggregator, [/* existing agents */, plagiarismAgent]);

2. 支持多语言

var languageDetector = new LanguageDetectorExecutor();
builder.AddEdge(extractor, languageDetector);

// 根据语言选择不同的分析 Agent
builder.AddEdge<DetectedLanguage>(
    languageDetector,
    englishAnalysisAgent,
    condition: lang => lang.Code == "en"
);
builder.AddEdge<DetectedLanguage>(
    languageDetector,
    chineseAnalysisAgent,
    condition: lang => lang.Code == "zh"
);

3. 集成外部服务

public class ExternalAPIExecutor : Executor<string, APIResult>
{
    private readonly HttpClient _httpClient;

    public override async ValueTask<APIResult> HandleAsync(
        string data, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        var response = await _httpClient.PostAsJsonAsync(
            "https://api.example.com/analyze",
            data,
            cancellationToken
        );
        
        return await response.Content.ReadFromJsonAsync<APIResult>(cancellationToken);
    }
}

九、可观测性:让 Workflow 透明化

9.1 内置的事件系统

Agent Framework 提供了丰富的事件类型:

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case WorkflowStartedEvent started:
            Console.WriteLine($"🚀 Workflow started: {started.WorkflowId}");
            break;

        case ExecutorInvokedEvent invoked:
            Console.WriteLine($"▶️  Invoking {invoked.ExecutorId}");
            break;

        case ExecutorCompletedEvent completed:
            Console.WriteLine($"✅ {completed.ExecutorId} completed in {completed.Duration}ms");
            break;

        case ExecutorFailedEvent failed:
            Console.WriteLine($"❌ {failed.ExecutorId} failed: {failed.Exception.Message}");
            break;

        case SuperStepStartedEvent stepStarted:
            Console.WriteLine($"⚡ Super Step {stepStarted.StartInfo.SuperStepId} started");
            break;

        case SuperStepCompletedEvent stepCompleted:
            Console.WriteLine($"⚡ Super Step completed with {stepCompleted.CompletionInfo!.ExecutorCount} executors");
            break;

        case AgentRunUpdateEvent agentUpdate:
            Console.Write(agentUpdate.Update.Text); // 实时流式输出
            break;

        case WorkflowOutputEvent output:
            Console.WriteLine($"📤 Output: {output.Data}");
            break;
    }
}

9.2 集成 OpenTelemetry

Agent Framework 原生支持 OpenTelemetry:

using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .SetResourceBuilder(ResourceBuilder.CreateDefault()
        .AddService("DocumentProcessingService"))
    .AddSource("Microsoft.Agents.AI.Workflows")
    .AddConsoleExporter()
    .AddOtlpExporter(options =>
    {
        options.Endpoint = new Uri("http://localhost:4317");
    })
    .Build();

// 执行 Workflow,所有操作都会自动记录到 OpenTelemetry
await using var run = await InProcessExecution.RunAsync(workflow, input);

9.3 自定义指标收集

public class MetricsCollectorExecutor : Executor<object>
{
    private static readonly Counter<long> s_messageCounter = 
        Meter.CreateCounter<long>("workflow.messages.processed");
    
    private static readonly Histogram<double> s_processingTime = 
        Meter.CreateHistogram<double>("workflow.processing.duration");

    public override async ValueTask HandleAsync(
        object message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            await ProcessMessageAsync(message, cancellationToken);
            s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "success"));
        }
        catch (Exception ex)
        {
            s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "failure"));
            throw;
        }
        finally
        {
            stopwatch.Stop();
            s_processingTime.Record(stopwatch.ElapsedMilliseconds);
        }
    }
}

十、Workflow 的高级特性与技巧

10.1 子工作流:模块化的艺术

当 Workflow 变得复杂时,我们需要将其拆分成更小的、可复用的单元。这就是子工作流的用武之地。

10.1.1 为什么需要子工作流?

想象你在构建一个电商系统,订单处理流程可能包括:

  • 库存检查

  • 支付处理

  • 物流安排

  • 通知发送

每个环节本身就是一个复杂的流程。如果全部写在一个 Workflow 里,会变成"意大利面代码"。

10.1.2 子工作流的实现
// 定义一个支付处理子工作流
public class PaymentWorkflow
{
    public static Workflow Build()
    {
        var validator = new PaymentValidatorExecutor();
        var processor = new PaymentProcessorExecutor();
        var notifier = new PaymentNotifierExecutor();

        return new WorkflowBuilder(validator)
            .AddEdge(validator, processor)
            .AddEdge(processor, notifier)
            .WithOutputFrom(notifier)
            .Build();
    }
}

// 在主工作流中使用子工作流
public class OrderWorkflow
{
    public static Workflow Build()
    {
        var inventoryCheck = new InventoryCheckExecutor();
        
        // 将子工作流包装成 Executor
        var paymentWorkflow = new WorkflowHostExecutor(
            PaymentWorkflow.Build(),
            "PaymentSubWorkflow"
        );
        
        var shipping = new ShippingExecutor();

        return new WorkflowBuilder(inventoryCheck)
            .AddEdge(inventoryCheck, paymentWorkflow)
            .AddEdge(paymentWorkflow, shipping)
            .WithOutputFrom(shipping)
            .Build();
    }
}

子工作流的优势

  1. 模块化:每个子工作流可以独立开发和测试

  2. 复用性:支付流程可以在多个地方使用

  3. 可维护性:修改支付逻辑不影响其他部分

  4. 可测试性:可以单独测试子工作流

10.1.3 子工作流的通信

子工作流不是黑盒,它可以与父工作流通信:

public class SubworkflowWithEventsExecutor : Executor<OrderData>
{
    private readonly Workflow _subworkflow;

    public override async ValueTask HandleAsync(
        OrderData order, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        await using var subRun = await InProcessExecution.StreamAsync(
            _subworkflow, 
            order
        );

        await foreach (WorkflowEvent evt in subRun.WatchStreamAsync())
        {
            // 将子工作流的事件转发到父工作流
            if (evt is SubworkflowWarningEvent warning)
            {
                await context.AddEventAsync(
                    new WorkflowWarningEvent($"[SubWorkflow] {warning.Message}"),
                    cancellationToken
                );
            }
            else if (evt is WorkflowOutputEvent output)
            {
                await context.SendMessageAsync(output.Data, cancellationToken: cancellationToken);
            }
        }
    }
}

10.2 动态工作流:运行时构建

有时候,Workflow 的结构需要根据运行时的数据来决定。比如:

  • 根据用户权限决定审批流程

  • 根据订单金额决定是否需要额外验证

  • 根据文档类型选择不同的处理流程

public class DynamicWorkflowBuilder
{
    public static Workflow BuildForUser(User user, IChatClient client)
    {
        var start = new StartExecutor();
        var builder = new WorkflowBuilder(start);

        // 根据用户角色动态添加步骤
        if (user.Role == "Admin")
        {
            var adminAgent = new ChatClientAgent(client, "Admin assistant");
            builder.AddEdge(start, adminAgent);
        }
        else if (user.Role == "Manager")
        {
            var managerAgent = new ChatClientAgent(client, "Manager assistant");
            var approvalPort = RequestPort.Create<ApprovalRequest, bool>("ManagerApproval");
            
            builder.AddEdge(start, managerAgent);
            builder.AddEdge(managerAgent, approvalPort);
        }
        else
        {
            var basicAgent = new ChatClientAgent(client, "Basic assistant");
            builder.AddEdge(start, basicAgent);
        }

        return builder.Build();
    }
}

10.3 循环与迭代

有些场景需要重复执行某些步骤,直到满足条件为止。

10.3.1 简单循环
public class LoopWorkflow
{
    public static Workflow Build()
    {
        var processor = new DataProcessorExecutor();
        var validator = new ValidatorExecutor();

        var builder = new WorkflowBuilder(processor);
        
        // 如果验证失败,回到处理器重新处理
        builder.AddEdge<ValidationResult>(
            validator,
            processor,
            condition: result => !result.IsValid
        );

        // 如果验证成功,输出结果
        builder.AddEdge<ValidationResult>(
            validator,
            new OutputExecutor(),
            condition: result => result.IsValid
        );

        return builder.Build();
    }
}
10.3.2 带计数器的循环
public class RetryExecutor : StatefulExecutor<string, string>
{
    private int _attemptCount = 0;
    private const int MaxAttempts = 3;

    public RetryExecutor() : base("RetryExecutor", 
        new StatefulExecutorOptions { StateScope = StateScope.Run })
    { }

    public override async ValueTask<string> HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        _attemptCount++;

        try
        {
            return await ProcessWithRetry(message, cancellationToken);
        }
        catch (Exception ex) when (_attemptCount < MaxAttempts)
        {
            await context.AddEventAsync(
                new WorkflowWarningEvent($"Attempt {_attemptCount} failed, retrying..."),
                cancellationToken
            );
            
            // 发送消息给自己,触发重试
            await context.SendMessageAsync(message, cancellationToken: cancellationToken);
            return string.Empty; // 临时返回
        }
        catch (Exception ex)
        {
            throw new InvalidOperationException(
                $"Failed after {MaxAttempts} attempts", ex
            );
        }
    }
}

10.4 条件路由的高级用法

10.4.1 Switch-Case 模式
public class RouterExecutor : Executor<Message, RoutingDecision>
{
    public override ValueTask<RoutingDecision> HandleAsync(
        Message message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        var decision = message.Type switch
        {
            "urgent" => new RoutingDecision { Target = "UrgentHandler", Priority = 1 },
            "normal" => new RoutingDecision { Target = "NormalHandler", Priority = 2 },
            "low" => new RoutingDecision { Target = "LowPriorityHandler", Priority = 3 },
            _ => new RoutingDecision { Target = "DefaultHandler", Priority = 4 }
        };

        return ValueTask.FromResult(decision);
    }
}

// 在 WorkflowBuilder 中使用
var router = new RouterExecutor();
var urgentHandler = new UrgentHandlerExecutor();
var normalHandler = new NormalHandlerExecutor();
var lowHandler = new LowPriorityHandlerExecutor();

builder.AddEdge<RoutingDecision>(router, urgentHandler, 
    condition: d => d.Target == "UrgentHandler");
builder.AddEdge<RoutingDecision>(router, normalHandler, 
    condition: d => d.Target == "NormalHandler");
builder.AddEdge<RoutingDecision>(router, lowHandler, 
    condition: d => d.Target == "LowPriorityHandler");
10.4.2 多条件选择

有时候一个消息需要同时发送到多个目标:

// 使用 Fan-Out 的 partitioner 实现多选
builder.AddFanOutEdge<Message>(
    source,
    partitioner: (message, count) =>
    {
        var targets = new List<int>();
        
        if (message.RequiresLogging)
            targets.Add(0); // Logger
        
        if (message.RequiresNotification)
            targets.Add(1); // Notifier
        
        if (message.RequiresArchiving)
            targets.Add(2); // Archiver
        
        return targets;
    },
    targets: [logger, notifier, archiver]
);

十一、生产环境部署考虑

11.1 容错与恢复

11.1.1 Checkpoint 持久化

在生产环境中,内存中的 Checkpoint 是不够的,我们需要持久化存储:

public class DatabaseCheckpointStore : ICheckpointStore
{
    private readonly DbContext _dbContext;

    public async Task SaveCheckpointAsync(
        string checkpointId, 
        byte[] data, 
        CancellationToken cancellationToken)
    {
        var checkpoint = new CheckpointEntity
        {
            Id = checkpointId,
            Data = data,
            CreatedAt = DateTime.UtcNow
        };

        _dbContext.Checkpoints.Add(checkpoint);
        await _dbContext.SaveChangesAsync(cancellationToken);
    }

    public async Task<byte[]?> LoadCheckpointAsync(
        string checkpointId, 
        CancellationToken cancellationToken)
    {
        var checkpoint = await _dbContext.Checkpoints
            .FirstOrDefaultAsync(c => c.Id == checkpointId, cancellationToken);
        
        return checkpoint?.Data;
    }
}

// 使用自定义的 Checkpoint Store
var checkpointManager = new CheckpointManager(
    new DatabaseCheckpointStore(dbContext)
);
11.1.2 分布式 Checkpoint

对于分布式系统,可以使用 Redis 或 Azure Blob Storage:

public class RedisCheckpointStore : ICheckpointStore
{
    private readonly IConnectionMultiplexer _redis;

    public async Task SaveCheckpointAsync(
        string checkpointId, 
        byte[] data, 
        CancellationToken cancellationToken)
    {
        var db = _redis.GetDatabase();
        await db.StringSetAsync(
            $"checkpoint:{checkpointId}", 
            data,
            expiry: TimeSpan.FromDays(7) // 7 天后自动过期
        );
    }

    public async Task<byte[]?> LoadCheckpointAsync(
        string checkpointId, 
        CancellationToken cancellationToken)
    {
        var db = _redis.GetDatabase();
        var data = await db.StringGetAsync($"checkpoint:{checkpointId}");
        return data.HasValue ? (byte[])data : null;
    }
}

11.2 性能监控

11.2.1 集成 Application Insights
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;

public class MonitoredWorkflowExecutor : Executor<string>
{
    private readonly TelemetryClient _telemetry;

    public MonitoredWorkflowExecutor(TelemetryClient telemetry) 
        : base("MonitoredExecutor")
    {
        _telemetry = telemetry;
    }

    public override async ValueTask HandleAsync(
        string message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        using var operation = _telemetry.StartOperation<RequestTelemetry>("ProcessMessage");
        operation.Telemetry.Properties["MessageLength"] = message.Length.ToString();

        try
        {
            await ProcessMessageAsync(message, cancellationToken);
            operation.Telemetry.Success = true;
        }
        catch (Exception ex)
        {
            operation.Telemetry.Success = false;
            _telemetry.TrackException(ex);
            throw;
        }
    }
}
11.2.2 自定义健康检查
public class WorkflowHealthCheck : IHealthCheck
{
    private readonly Workflow _workflow;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context, 
        CancellationToken cancellationToken = default)
    {
        try
        {
            // 执行一个简单的测试 Workflow
            await using var run = await InProcessExecution.RunAsync(
                _workflow, 
                "health-check",
                cancellationToken: cancellationToken
            );

            var completed = false;
            await foreach (var evt in run.NewEvents)
            {
                if (evt is WorkflowOutputEvent)
                {
                    completed = true;
                    break;
                }
            }

            return completed 
                ? HealthCheckResult.Healthy("Workflow is operational")
                : HealthCheckResult.Degraded("Workflow did not complete");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("Workflow failed", ex);
        }
    }
}

// 在 Startup.cs 中注册
services.AddHealthChecks()
    .AddCheck<WorkflowHealthCheck>("workflow");

11.3 安全性考虑

11.3.1 输入验证
public class SecureExecutor : Executor<UserInput, ProcessedData>
{
    public override async ValueTask<ProcessedData> HandleAsync(
        UserInput input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 验证输入
        if (string.IsNullOrWhiteSpace(input.Data))
        {
            throw new ArgumentException("Input data cannot be empty");
        }

        // 检查输入长度
        if (input.Data.Length > 10000)
        {
            throw new ArgumentException("Input data exceeds maximum length");
        }

        // 清理潜在的恶意内容
        var sanitized = SanitizeInput(input.Data);

        return await ProcessSecurely(sanitized, cancellationToken);
    }

    private string SanitizeInput(string input)
    {
        // 移除 HTML 标签、SQL 注入尝试等
        return System.Net.WebUtility.HtmlEncode(input);
    }
}
11.3.2 敏感数据处理
public class SecureDataExecutor : Executor<SensitiveData>
{
    private readonly IDataProtector _protector;

    public SecureDataExecutor(IDataProtectionProvider provider) 
        : base("SecureDataExecutor")
    {
        _protector = provider.CreateProtector("WorkflowDataProtection");
    }

    protected internal override async ValueTask OnCheckpointingAsync(
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 在保存 Checkpoint 前加密敏感数据
        if (_sensitiveData != null)
        {
            _encryptedData = _protector.Protect(_sensitiveData);
            _sensitiveData = null; // 清除明文
        }
    }

    protected internal override async ValueTask OnCheckpointRestoredAsync(
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 从 Checkpoint 恢复后解密数据
        if (_encryptedData != null)
        {
            _sensitiveData = _protector.Unprotect(_encryptedData);
        }
    }
}

11.4 扩展性设计

11.4.1 水平扩展

对于高负载场景,可以将 Workflow 部署到多个实例:

public class DistributedWorkflowService
{
    private readonly IMessageQueue _queue;
    private readonly Workflow _workflow;

    public async Task EnqueueWorkflowAsync(WorkflowRequest request)
    {
        // 将请求放入队列
        await _queue.PublishAsync("workflow-queue", request);
    }

    public async Task ProcessWorkflowsAsync(CancellationToken cancellationToken)
    {
        // 多个实例可以并行处理队列中的请求
        await foreach (var request in _queue.SubscribeAsync("workflow-queue", cancellationToken))
        {
            try
            {
                await using var run = await InProcessExecution.RunAsync(
                    _workflow, 
                    request.Input,
                    cancellationToken: cancellationToken
                );

                // 处理结果...
            }
            catch (Exception ex)
            {
                // 错误处理和重试逻辑
                await HandleErrorAsync(request, ex);
            }
        }
    }
}
11.4.2 资源限制
public class ThrottledWorkflowService
{
    private readonly SemaphoreSlim _semaphore;
    private readonly Workflow _workflow;

    public ThrottledWorkflowService(Workflow workflow, int maxConcurrency = 10)
    {
        _workflow = workflow;
        _semaphore = new SemaphoreSlim(maxConcurrency);
    }

    public async Task<TOutput> ExecuteAsync<TInput, TOutput>(
        TInput input, 
        CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await using var run = await InProcessExecution.RunAsync(
                _workflow, 
                input,
                cancellationToken: cancellationToken
            );

            // 提取输出...
            return default!;
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

十二、未来展望与趋势

12.1 AI 驱动的工作流优化

未来的 Workflow 系统可能会使用 AI 来自动优化执行路径:

// 概念性代码 - 未来可能的功能
public class AIOptimizedWorkflow
{
    private readonly Workflow _workflow;
    private readonly AIOptimizer _optimizer;

    public async Task<Workflow> OptimizeAsync()
    {
        // AI 分析历史执行数据
        var executionHistory = await LoadExecutionHistoryAsync();
        
        // 识别瓶颈和优化机会
        var insights = await _optimizer.AnalyzeAsync(executionHistory);
        
        // 自动重构 Workflow
        if (insights.SuggestParallelization)
        {
            return RebuildWithParallelism(_workflow, insights.ParallelizableSteps);
        }
        
        return _workflow;
    }
}

12.2 低代码/无代码平台集成

声明式 Workflow 为低代码平台铺平了道路:

[可视化设计器]
    ↓ 导出
[JSON Workflow 定义]
    ↓ 加载
[Agent Framework 执行]

未来可能会有:

  • 拖拽式 Workflow 设计器

  • 实时预览和调试

  • 模板市场(预定义的 Workflow 模板)

  • 协作编辑(多人同时设计 Workflow)

12.3 边缘计算与 Workflow

随着边缘计算的发展,Workflow 可能会在边缘设备上运行:

// IoT 设备上的轻量级 Workflow
public class EdgeWorkflow
{
    public static Workflow BuildForEdge()
    {
        var sensor = new SensorDataExecutor();
        var filter = new DataFilterExecutor();
        var localAnalysis = new LocalAnalysisExecutor();
        var cloudSync = new CloudSyncExecutor();

        return new WorkflowBuilder(sensor)
            .AddEdge(sensor, filter)
            .AddEdge<SensorData>(
                filter,
                localAnalysis,
                condition: data => data.IsNormal // 正常数据本地处理
            )
            .AddEdge<SensorData>(
                filter,
                cloudSync,
                condition: data => !data.IsNormal // 异常数据上传云端
            )
            .Build();
    }
}

12.4 多模态 Workflow

未来的 Workflow 将支持更多模态的数据处理:

public class MultiModalWorkflow
{
    public static Workflow Build(IChatClient client)
    {
        var imageAnalyzer = new ImageAnalysisAgent(client);
        var audioTranscriber = new AudioTranscriptionAgent(client);
        var textAnalyzer = new TextAnalysisAgent(client);
        var videoProcessor = new VideoProcessingAgent(client);
        var synthesizer = new MultiModalSynthesisAgent(client);

        return new WorkflowBuilder(new InputRouterExecutor())
            .AddFanOutEdge(
                router,
                targets: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor]
            )
            .AddFanInEdge(synthesizer, 
                sources: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor])
            .WithOutputFrom(synthesizer)
            .Build();
    }
}

更多AIGC文章

RAG技术全解:从原理到实战的简明指南

更多VibeCoding文章

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐