当 AI Agent 遇上工作流编排:微软 Agent Framework 的 Workflow 深度解析
摘要: Microsoft Agent Framework中的Workflow功能通过将多个AI Agent(执行器)组合编排,实现复杂任务的协同处理。Workflow由Executor(专注单一功能)、Edge(连接逻辑)和Orchestrator(编排引擎)构成,支持流水线、并行、智能转接和圆桌会议等模式。其核心优势包括: 模块化设计:各Executor独立运行,通过Edge灵活连接,支持状态
"如果说单个 AI Agent 是一位专家,那么 Workflow 就是让这些专家协同作战的指挥官。"
引言:从单打独斗到团队协作
还记得你第一次使用 ChatGPT 的感觉吗?那种"哇,AI 真的能理解我"的惊喜。但很快你就会发现,当任务变得复杂时,单个 AI 对话就像是让一个人同时扮演厨师、服务员和收银员——理论上可行,实际上一团糟。
这就是为什么微软推出了 Agent Framework,而其中的 Workflow 功能,正是解决这个问题的杀手锏。今天,我们就来深入探讨这个让 AI Agent 从"单兵作战"升级为"特种部队"的技术。
一、Workflow 是什么?为什么我们需要它?
1.1 现实世界的痛点
想象一个场景:你需要构建一个智能客服系统,它要能够:
-
理解用户的问题(自然语言处理)
-
查询订单数据库(数据检索)
-
判断是否需要人工介入(决策逻辑)
-
生成友好的回复(内容生成)
如果用传统的单一 AI Agent 来处理,你会遇到什么问题?
-
上下文混乱:一个 Agent 要记住所有状态,容易"精神分裂"
-
职责不清:什么都做,往往什么都做不好
-
难以扩展:新增功能就像在意大利面代码里加调料
-
无法并行:明明可以同时做的事,却要排队等待
1.2 Workflow 的核心理念
Microsoft Agent Framework 的 Workflow 采用了一个优雅的设计哲学:
Workflow = Executors(执行器)+ Edges(边)+ Orchestration(编排)
这就像搭积木:
-
Executor:每个积木块,专注做一件事
-
Edge:积木块之间的连接方式
-
Workflow:最终搭建出的复杂结构
让我们看一个最简单的例子:
// 创建两个执行器
UppercaseExecutor uppercase = new();
ReverseTextExecutor reverse = new();
// 用边连接它们
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse)
.WithOutputFrom(reverse);
var workflow = builder.Build();
// 执行:输入 "Hello" → 转大写 "HELLO" → 反转 "OLLEH"
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello");
看到了吗?每个 Executor 只做一件事,但通过 Edge 连接后,就能完成复杂的任务。这就是组合的力量。
二、Workflow 的核心组件深度剖析
2.1 Executor:工作流的基本单元
Executor 是 Workflow 的"原子",它的设计哲学是:单一职责,明确输入输出。
2.1.1 Executor 的三种形态
形态一:无返回值的 Executor
internal sealed class LoggerExecutor() : Executor<string>("Logger")
{
public override ValueTask HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"[LOG] {message}");
return default; // 不返回任何值
}
}
形态二:有返回值的 Executor
internal sealed class UppercaseExecutor() : Executor<string, string>("Uppercase")
{
public override ValueTask<string> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
return ValueTask.FromResult(message.ToUpperInvariant());
}
}
形态三:AI Agent 作为 Executor
这是最有趣的部分!Agent Framework 允许你把任何 AI Agent 包装成 Executor:
var chatClient = new AzureOpenAIClient(endpoint, credential)
.GetChatClient(deploymentName)
.AsIChatClient();
// 创建一个翻译 Agent
AIAgent frenchAgent = new ChatClientAgent(
chatClient,
"You are a translation assistant that translates to French."
);
// Agent 可以直接用在 Workflow 中!
var workflow = new WorkflowBuilder(frenchAgent)
.AddEdge(frenchAgent, spanishAgent)
.AddEdge(spanishAgent, englishAgent)
.Build();
2.1.2 Executor 的生命周期
每个 Executor 都有完整的生命周期管理:
protected internal virtual ValueTask InitializeAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 初始化资源、加载配置等
}
protected internal virtual ValueTask OnCheckpointingAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 保存检查点前的准备工作
}
protected internal virtual ValueTask OnCheckpointRestoredAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 从检查点恢复后的初始化
}
这种设计让 Executor 不仅仅是一个函数,而是一个有状态、可管理的组件。
2.2 Edge:数据流动的艺术
如果说 Executor 是节点,那么 Edge 就是连接这些节点的"血管"。Agent Framework 提供了三种 Edge 类型:
2.2.1 Direct Edge:一对一连接
最简单的边,数据从一个 Executor 流向另一个:
builder.AddEdge(sourceExecutor, targetExecutor);
但这还不够强大,我们可以加上条件判断:
builder.AddEdge<int>(
source,
target,
condition: value => value > 0 // 只有正数才会传递
);
这就像给数据流加上了"阀门",只有满足条件的数据才能通过。
2.2.2 Fan-Out Edge:一对多广播
想象一个场景:你需要同时咨询三个专家的意见。Fan-Out 就是为此而生:
ChatClientAgent physicist = new(client, "Physics expert");
ChatClientAgent chemist = new(client, "Chemistry expert");
ChatClientAgent biologist = new(client, "Biology expert");
builder.AddFanOutEdge(
startExecutor,
targets: [physicist, chemist, biologist]
);
所有专家会并行处理同一个问题,大大提升效率!
你还可以自定义分发策略:
builder.AddFanOutEdge<string>(
source,
partitioner: (message, count) =>
{
// 根据消息内容决定发给哪些 Executor
if (message.Contains("urgent"))
return [0, 1]; // 发给前两个
else
return [2]; // 只发给第三个
},
targets: [executor1, executor2, executor3]
);
2.2.3 Fan-In Edge:多对一汇聚
有了 Fan-Out,自然需要 Fan-In 来收集结果:
var aggregator = new ResultAggregatorExecutor();
builder.AddFanInEdge(
aggregator,
sources: [physicist, chemist, biologist]
);
Aggregator 会等待所有源 Executor 完成后,再统一处理结果。这就是经典的 Map-Reduce 模式!
三、Workflow 的高级模式:从理论到实战
3.1 Sequential Pattern:流水线模式
这是最直观的模式,就像工厂流水线一样,每个环节依次处理:
var workflow = AgentWorkflowBuilder.BuildSequential(
"Translation Pipeline",
GetTranslationAgent("French", client),
GetTranslationAgent("Spanish", client),
GetTranslationAgent("English", client)
);
// 输入:"Hello"
// → 法语:"Bonjour"
// → 西班牙语:"Hola"
// → 英语:"Hello"
适用场景:
-
数据处理管道(清洗 → 转换 → 验证)
-
多步骤审批流程
-
渐进式内容生成
性能特点:
-
延迟:累加(每个步骤的延迟相加)
-
吞吐量:受限于最慢的环节
-
资源占用:低(同一时间只有一个 Executor 在工作)
3.2 Concurrent Pattern:并行处理模式
当多个任务互不依赖时,为什么要让它们排队?并行处理才是王道:
var workflow = AgentWorkflowBuilder.BuildConcurrent(
"Multi-Expert Consultation",
agents: [physicistAgent, chemistAgent, biologistAgent],
aggregator: results =>
{
// 自定义聚合逻辑
return results
.SelectMany(list => list)
.OrderBy(msg => msg.Timestamp)
.ToList();
}
);
实战案例:智能文档分析系统
假设你要分析一份技术文档,需要:
-
提取关键词(NLP Agent)
-
生成摘要(Summarization Agent)
-
检测技术栈(Tech Stack Detector Agent)
这三个任务完全独立,用并行模式可以将处理时间从 30 秒降到 10 秒!
var nlpAgent = new ChatClientAgent(client, "Extract keywords");
var summaryAgent = new ChatClientAgent(client, "Generate summary");
var techAgent = new ChatClientAgent(client, "Detect tech stack");
var workflow = AgentWorkflowBuilder.BuildConcurrent(
"Document Analyzer",
[nlpAgent, summaryAgent, techAgent]
);
await using var run = await InProcessExecution.StreamAsync(
workflow,
new ChatMessage(ChatRole.User, documentContent)
);
性能对比:
| 模式 | 总耗时 | CPU 利用率 | 适用场景 |
|---|---|---|---|
| Sequential | 30s | 33% | 任务有依赖关系 |
| Concurrent | 10s | 90% | 任务相互独立 |
3.3 Handoffs Pattern:智能转接模式
这是最接近真实世界的模式。想象一个客服中心:
-
前台接待(Triage Agent)判断问题类型
-
技术支持(Tech Agent)处理技术问题
-
销售顾问(Sales Agent)处理购买咨询
-
必要时可以转回前台
var triageAgent = new ChatClientAgent(
client,
"Determine which specialist to route to. ALWAYS handoff.",
"triage_agent"
);
var techAgent = new ChatClientAgent(
client,
"Handle technical support questions only.",
"tech_support"
);
var salesAgent = new ChatClientAgent(
client,
"Handle sales and pricing questions only.",
"sales_agent"
);
var workflow = AgentWorkflowBuilder
.CreateHandoffBuilderWith(triageAgent)
.WithHandoffs(triageAgent, [techAgent, salesAgent])
.WithHandoffs([techAgent, salesAgent], triageAgent)
.Build();
Handoff 的魔法:AI Tool Calling
Handoff 是如何实现的?答案是 AI Function Calling!
当你配置 Handoff 时,Framework 会自动为每个目标 Agent 生成一个 Tool:
// 自动生成的 Tool
{
"name": "handoff_to_tech_support",
"description": "Handle technical support questions only.",
"parameters": { ... }
}
当 Triage Agent 判断需要技术支持时,它会"调用"这个 Tool,Framework 拦截这个调用,将对话转交给 Tech Agent。整个过程对用户来说是无缝的!
3.4 Group Chat Pattern:圆桌会议模式
如果说 Handoffs 是"点对点转接",那么 Group Chat 就是"所有人都在一个会议室":
var workflow = AgentWorkflowBuilder
.CreateGroupChatBuilderWith(agents =>
new RoundRobinGroupChatManager(agents)
{
MaximumIterationCount = 5
})
.AddParticipants(
GetTranslationAgent("French", client),
GetTranslationAgent("Spanish", client),
GetTranslationAgent("English", client)
)
.Build();
Group Chat Manager 的职责:
-
决定下一个发言的 Agent
-
控制对话轮次
-
判断何时结束讨论
你可以实现自定义的 Manager:
public class SmartGroupChatManager : GroupChatManager
{
protected override async ValueTask<AIAgent?> SelectNextAgentAsync(
IReadOnlyList<ChatMessage> history,
CancellationToken cancellationToken)
{
// 使用 AI 来决定下一个发言者
var decision = await _decisionAgent.RunAsync(
$"Based on the conversation, who should speak next? {string.Join(", ", _agents.Select(a => a.Name))}"
);
return _agents.FirstOrDefault(a =>
decision.Contains(a.Name, StringComparison.OrdinalIgnoreCase)
);
}
}
四、Workflow 的状态管理:时间旅行不是梦
4.1 Checkpoint:工作流的"存档点"
想象你在玩一个复杂的 RPG 游戏,突然断电了。如果没有存档,你得从头开始。Workflow 的 Checkpoint 就是这样的"存档系统"。
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
await using Checkpointed<StreamingRun> run = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager);
await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent superStepEvt)
{
// 每个 Super Step 完成后自动创建 Checkpoint
CheckpointInfo? checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} saved!");
}
}
}
Super Step 是什么?
Workflow 的执行不是连续的,而是分成一个个"超级步骤":
Super Step 1: [Executor A] 完成
Super Step 2: [Executor B, Executor C] 并行完成
Super Step 3: [Executor D] 完成
每个 Super Step 结束后,系统会自动保存状态。这样设计的好处:
-
粒度合适:不会太频繁(每条消息一个),也不会太稀疏
-
状态一致:Super Step 内的所有 Executor 要么全部完成,要么全部未完成
-
易于恢复:从任何 Checkpoint 恢复都能保证状态一致
4.2 时间旅行:回到过去
有了 Checkpoint,你可以做一些"魔法操作":
// 恢复到第 5 个 Checkpoint
CheckpointInfo savedCheckpoint = checkpoints[4];
await run.RestoreCheckpointAsync(savedCheckpoint);
// 从这个点继续执行
await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{
// 继续处理...
}
实战场景:A/B 测试
假设你有一个复杂的 Workflow,前面 5 个步骤都一样,但第 6 步有两种不同的策略。你可以:
-
执行到第 5 步,保存 Checkpoint
-
用策略 A 完成剩余步骤,记录结果
-
恢复到第 5 步的 Checkpoint
-
用策略 B 完成剩余步骤,记录结果
-
对比两种策略的效果
这在传统编程中需要复杂的状态管理,但在 Workflow 中只需要几行代码!
4.3 Shared State:Executor 之间的"共享内存"
有时候,多个 Executor 需要共享一些数据。比如:
-
累加器(统计处理了多少条数据)
-
缓存(避免重复计算)
-
配置(所有 Executor 共享的参数)
public class CounterExecutor : StatefulExecutor<string>
{
private int _count = 0;
public CounterExecutor() : base("Counter",
new StatefulExecutorOptions
{
StateScope = StateScope.Workflow // 工作流级别的状态
})
{ }
public override async ValueTask HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
_count++;
Console.WriteLine($"Processed {_count} messages");
// 状态会自动保存到 Checkpoint
await context.SendMessageAsync($"Count: {_count}");
}
}
状态作用域:
| 作用域 | 生命周期 | 适用场景 |
|---|---|---|
Executor |
单个 Executor 实例 | 临时缓存 |
Run |
单次 Workflow 执行 | 会话状态 |
Workflow |
整个 Workflow | 全局配置 |
五、人机协作:Human-in-the-Loop 模式
5.1 为什么需要人类介入?
AI 再强大,也有它的局限性:
-
关键决策:涉及法律、伦理的决定需要人类审核
-
异常处理:AI 遇到边界情况时需要人类指导
-
质量把关:重要内容发布前需要人工审查
Agent Framework 提供了优雅的 Request Port 机制来实现人机协作。
5.2 Request Port:工作流的"暂停按钮"
Request Port 就像是工作流中的一个"等待点",当执行到这里时,会暂停并向外部请求输入:
// 创建一个 Request Port
var humanInputPort = RequestPort.Create<NumberSignal, int>("HumanInput");
// 在 Workflow 中使用
var workflow = new WorkflowBuilder(judgeExecutor)
.AddEdge(judgeExecutor, humanInputPort)
.AddEdge(humanInputPort, judgeExecutor)
.Build();
实战案例:数字猜谜游戏
让我们看一个完整的例子:
public class JudgeExecutor : Executor<int, NumberSignal>
{
private const int TargetNumber = 42;
public override ValueTask<NumberSignal> HandleAsync(
int guess,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
if (guess == TargetNumber)
{
await context.YieldOutputAsync("Correct! You win!");
return ValueTask.FromResult(NumberSignal.Correct);
}
else if (guess > TargetNumber)
{
return ValueTask.FromResult(NumberSignal.Above);
}
else
{
return ValueTask.FromResult(NumberSignal.Below);
}
}
}
// 执行 Workflow
await using StreamingRun run = await InProcessExecution.StreamAsync(
workflow,
NumberSignal.Init
);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent requestEvt)
{
// Workflow 请求人类输入
Console.Write("Enter your guess: ");
int guess = int.Parse(Console.ReadLine()!);
// 发送响应
await run.SendResponseAsync(
requestEvt.Request.CreateResponse(guess)
);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine(outputEvt.Data);
break;
}
}
交互流程:
1. Workflow 启动 → JudgeExecutor 发送 Init 信号
2. 到达 Request Port → 触发 RequestInfoEvent
3. 外部程序接收事件 → 提示用户输入
4. 用户输入 50 → 发送 ExternalResponse
5. JudgeExecutor 判断 → 返回 Above 信号
6. 循环直到猜对...
5.3 高级场景:审批流程
在企业应用中,审批流程是典型的人机协作场景:
public class ApprovalWorkflow
{
public static Workflow Build()
{
var documentProcessor = new DocumentProcessorExecutor();
var aiReviewer = new AIReviewerAgent(client);
var humanApprovalPort = RequestPort.Create<ReviewResult, ApprovalDecision>("HumanApproval");
var publisher = new PublisherExecutor();
return new WorkflowBuilder(documentProcessor)
.AddEdge(documentProcessor, aiReviewer)
.AddEdge<ReviewResult>(
aiReviewer,
humanApprovalPort,
condition: result => result.Confidence < 0.9 // 置信度低时需要人工审核
)
.AddEdge<ReviewResult>(
aiReviewer,
publisher,
condition: result => result.Confidence >= 0.9 // 置信度高时直接发布
)
.AddEdge(humanApprovalPort, publisher)
.Build();
}
}
这个 Workflow 实现了"智能分流":
-
AI 有把握的内容(置信度 ≥ 0.9):自动发布
-
AI 不确定的内容(置信度 < 0.9):人工审核
这样既保证了质量,又提高了效率!
六、声明式 Workflow:低代码的未来
6.1 从命令式到声明式
前面我们看到的都是命令式的 Workflow 定义:
var builder = new WorkflowBuilder(start);
builder.AddEdge(start, middle);
builder.AddEdge(middle, end);
var workflow = builder.Build();
但对于非开发人员,这还是太复杂了。Agent Framework 提供了声明式 Workflow,用 JSON 配置就能定义复杂的流程!
6.2 声明式 Workflow 示例
{
"name": "CustomerServiceWorkflow",
"description": "Intelligent customer service automation",
"actions": [
{
"id": "start",
"type": "Question",
"properties": {
"prompt": "How can I help you today?"
}
},
{
"id": "classify",
"type": "InvokeAzureAgent",
"properties": {
"agentId": "classifier-agent",
"input": "{{start.response}}"
}
},
{
"id": "route",
"type": "ConditionGroup",
"conditions": [
{
"condition": "{{classify.category}} == 'technical'",
"next": "tech-support"
},
{
"condition": "{{classify.category}} == 'billing'",
"next": "billing-agent"
}
]
},
{
"id": "tech-support",
"type": "InvokeAzureAgent",
"properties": {
"agentId": "tech-support-agent"
}
},
{
"id": "billing-agent",
"type": "InvokeAzureAgent",
"properties": {
"agentId": "billing-agent"
}
}
]
}
6.3 声明式 Workflow 的优势
1. 可视化设计
配合可视化工具,业务人员可以像画流程图一样设计 Workflow:
[用户提问] → [AI分类] → [条件判断]
├─ 技术问题 → [技术支持Agent]
└─ 账单问题 → [账单Agent]
2. 动态加载
声明式 Workflow 可以在运行时加载和修改:
var workflowJson = await File.ReadAllTextAsync("workflow.json");
var workflow = await DeclarativeWorkflowBuilder.BuildFromJsonAsync(workflowJson);
// 无需重新编译,直接执行新的 Workflow
await InProcessExecution.RunAsync(workflow, input);
3. 版本管理
Workflow 定义存储为 JSON,可以轻松进行版本控制:
git diff workflow-v1.json workflow-v2.json
4. 跨平台共享
同一个 Workflow 定义可以在不同的平台上执行:
-
.NET 应用
-
Azure Functions
-
Power Automate(未来可能支持)
6.4 声明式 Workflow 的内置 Actions
Agent Framework 提供了丰富的内置 Actions:
状态管理类:
-
SetVariable:设置变量 -
SetMultipleVariables:批量设置变量 -
ClearAllVariables:清空所有变量
对话管理类:
-
CreateConversation:创建新对话 -
AddConversationMessage:添加消息 -
RetrieveConversationMessages:检索历史消息
控制流类:
-
ConditionGroup:条件分支 -
Foreach:循环遍历 -
GotoAction:跳转到指定步骤
AI 集成类:
-
InvokeAzureAgent:调用 Azure AI Agent -
Question:请求用户输入
数据处理类:
-
ParseValue:解析数据 -
EditTableV2:编辑表格数据
这些 Actions 可以像乐高积木一样组合,构建出复杂的业务逻辑!
七、性能优化与最佳实践
7.1 并发控制:不是越多越好
虽然并发能提升性能,但也要注意资源限制:
// ❌ 不好的做法:无限制并发
var workflow = new WorkflowBuilder(start);
for (int i = 0; i < 1000; i++)
{
var agent = new ChatClientAgent(client, $"Agent {i}");
builder.AddFanOutEdge(start, agent);
}
// ✅ 好的做法:分批处理
var batchSize = 10;
var batches = agents.Chunk(batchSize);
foreach (var batch in batches)
{
var batchWorkflow = AgentWorkflowBuilder.BuildConcurrent(batch);
// 执行批次...
}
7.2 状态管理:选择合适的作用域
// ❌ 不好的做法:所有状态都用 Workflow 级别
public class MyExecutor : StatefulExecutor<string>
{
public MyExecutor() : base("MyExecutor",
new StatefulExecutorOptions
{
StateScope = StateScope.Workflow // 会一直占用内存
})
{ }
}
// ✅ 好的做法:根据需要选择作用域
public class MyExecutor : StatefulExecutor<string>
{
public MyExecutor() : base("MyExecutor",
new StatefulExecutorOptions
{
StateScope = StateScope.Run // 执行完成后自动清理
})
{ }
}
7.3 Checkpoint 策略:平衡性能与可靠性
// 策略 1:每个 Super Step 都保存(默认)
// 优点:恢复粒度细
// 缺点:I/O 开销大
// 策略 2:只在关键节点保存
var checkpointManager = new SelectiveCheckpointManager(
shouldCheckpoint: (stepInfo) =>
stepInfo.ExecutorIds.Any(id => id.StartsWith("Critical"))
);
// 策略 3:定时保存
var checkpointManager = new TimedCheckpointManager(
interval: TimeSpan.FromMinutes(5)
);
7.4 错误处理:优雅降级
public class ResilientExecutor : Executor<string, string>
{
private readonly int _maxRetries = 3;
public override async ValueTask<string> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
for (int i = 0; i < _maxRetries; i++)
{
try
{
return await ProcessAsync(message, cancellationToken);
}
catch (Exception ex) when (i < _maxRetries - 1)
{
await context.AddEventAsync(
new WorkflowWarningEvent($"Retry {i + 1}/{_maxRetries}: {ex.Message}"),
cancellationToken
);
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)), cancellationToken);
}
}
// 最后一次失败,返回降级结果
return "Service temporarily unavailable. Please try again later.";
}
}
八、实战案例:构建智能文档处理系统
让我们通过一个完整的案例,把前面学到的知识串起来。
8.1 需求分析
假设我们要构建一个智能文档处理系统,功能包括:
-
文档上传:用户上传 PDF/Word 文档
-
内容提取:提取文本、图片、表格
- 多维度分析:
-
情感分析(正面/负面/中性)
-
关键词提取
-
摘要生成
-
技术栈识别(如果是技术文档)
-
-
质量评估:AI 评估文档质量
-
人工审核:质量低于阈值时需要人工审核
-
结果输出:生成分析报告
8.2 架构设计
[文档上传]
↓
[内容提取Executor]
↓
[Fan-Out: 并行分析]
├─ [情感分析Agent]
├─ [关键词提取Agent]
├─ [摘要生成Agent]
└─ [技术栈识别Agent]
↓
[Fan-In: 结果汇总Executor]
↓
[质量评估Agent]
↓
[条件分支]
├─ 质量高 → [自动发布Executor]
└─ 质量低 → [人工审核Port] → [发布Executor]
8.3 代码实现
Step 1: 定义 Executors
// 内容提取 Executor
public class ContentExtractorExecutor : Executor<DocumentUpload, ExtractedContent>
{
public ContentExtractorExecutor() : base("ContentExtractor") { }
public override async ValueTask<ExtractedContent> HandleAsync(
DocumentUpload upload,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 使用 PDF 库提取内容
var content = await ExtractContentFromDocument(upload.FilePath);
await context.AddEventAsync(
new WorkflowOutputEvent($"Extracted {content.Text.Length} characters"),
cancellationToken
);
return content;
}
}
// 结果汇总 Executor
public class ResultAggregatorExecutor : Executor<AnalysisResult>
{
private readonly List<AnalysisResult> _results = new();
private const int ExpectedResultCount = 4; // 4 个分析 Agent
public ResultAggregatorExecutor() : base("ResultAggregator") { }
public override async ValueTask HandleAsync(
AnalysisResult result,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
_results.Add(result);
if (_results.Count == ExpectedResultCount)
{
var aggregated = new AggregatedAnalysis
{
Sentiment = _results.First(r => r.Type == "Sentiment").Value,
Keywords = _results.First(r => r.Type == "Keywords").Value,
Summary = _results.First(r => r.Type == "Summary").Value,
TechStack = _results.First(r => r.Type == "TechStack").Value
};
await context.SendMessageAsync(aggregated, cancellationToken: cancellationToken);
}
}
}
// 发布 Executor
public class PublisherExecutor : Executor<PublishRequest, PublishResult>
{
public PublisherExecutor() : base("Publisher") { }
public override async ValueTask<PublishResult> HandleAsync(
PublishRequest request,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 发布到数据库或文件系统
var result = await PublishToStorage(request);
await context.YieldOutputAsync(result, cancellationToken);
return result;
}
}
Step 2: 创建 AI Agents
public class DocumentAnalysisAgents
{
private readonly IChatClient _client;
public DocumentAnalysisAgents(IChatClient client)
{
_client = client;
}
public ChatClientAgent CreateSentimentAgent() => new(
_client,
"""
You are a sentiment analysis expert. Analyze the provided text and return:
- Overall sentiment (Positive/Negative/Neutral)
- Confidence score (0-1)
- Key phrases that influenced the sentiment
Return your analysis in JSON format.
""",
"sentiment_analyzer"
);
public ChatClientAgent CreateKeywordAgent() => new(
_client,
"""
You are a keyword extraction expert. Extract the most important keywords and phrases.
Return top 10 keywords ranked by importance in JSON format.
""",
"keyword_extractor"
);
public ChatClientAgent CreateSummaryAgent() => new(
_client,
"""
You are a summarization expert. Create a concise summary (max 200 words)
that captures the main points of the document.
""",
"summarizer"
);
public ChatClientAgent CreateTechStackAgent() => new(
_client,
"""
You are a technology stack detector. Identify programming languages, frameworks,
databases, and tools mentioned in the document. Return as a structured list.
""",
"tech_detector"
);
public ChatClientAgent CreateQualityAgent() => new(
_client,
"""
You are a document quality assessor. Evaluate the document based on:
- Clarity and coherence
- Technical accuracy
- Completeness
- Professional writing quality
Return a quality score (0-100) and detailed feedback in JSON format.
""",
"quality_assessor"
);
}
Step 3: 构建 Workflow
public class DocumentProcessingWorkflow
{
public static Workflow Build(IChatClient client)
{
var agents = new DocumentAnalysisAgents(client);
// 创建 Executors
var extractor = new ContentExtractorExecutor();
var sentimentAgent = agents.CreateSentimentAgent();
var keywordAgent = agents.CreateKeywordAgent();
var summaryAgent = agents.CreateSummaryAgent();
var techAgent = agents.CreateTechStackAgent();
var aggregator = new ResultAggregatorExecutor();
var qualityAgent = agents.CreateQualityAgent();
var humanReviewPort = RequestPort.Create<QualityReport, ReviewDecision>("HumanReview");
var publisher = new PublisherExecutor();
// 构建 Workflow
var builder = new WorkflowBuilder(extractor);
// Fan-Out: 并行分析
builder.AddFanOutEdge(
extractor,
targets: [sentimentAgent, keywordAgent, summaryAgent, techAgent]
);
// 所有分析结果汇总
builder.AddFanInEdge(
aggregator,
sources: [sentimentAgent, keywordAgent, summaryAgent, techAgent]
);
// 质量评估
builder.AddEdge(aggregator, qualityAgent);
// 条件分支:高质量直接发布,低质量需要人工审核
builder.AddEdge<QualityReport>(
qualityAgent,
publisher,
condition: report => report.Score >= 80
);
builder.AddEdge<QualityReport>(
qualityAgent,
humanReviewPort,
condition: report => report.Score < 80
);
builder.AddEdge(humanReviewPort, publisher);
return builder
.WithName("Intelligent Document Processing")
.WithDescription("Automated document analysis with human-in-the-loop quality control")
.WithOutputFrom(publisher)
.Build();
}
}
Step 4: 执行 Workflow
public class DocumentProcessingService
{
private readonly Workflow _workflow;
private readonly CheckpointManager _checkpointManager;
public DocumentProcessingService(IChatClient client)
{
_workflow = DocumentProcessingWorkflow.Build(client);
_checkpointManager = CheckpointManager.Default;
}
public async Task<PublishResult> ProcessDocumentAsync(
string filePath,
CancellationToken cancellationToken = default)
{
var upload = new DocumentUpload { FilePath = filePath };
await using var run = await InProcessExecution.StreamAsync(
_workflow,
upload,
_checkpointManager
);
PublishResult? result = null;
await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent completed:
Console.WriteLine($"✓ {completed.ExecutorId} completed");
break;
case SuperStepCompletedEvent superStep:
Console.WriteLine($"⚡ Super Step {superStep.CompletionInfo!.SuperStepId} completed");
if (superStep.CompletionInfo.Checkpoint is not null)
{
Console.WriteLine($"💾 Checkpoint saved");
}
break;
case RequestInfoEvent request:
// 需要人工审核
var decision = await RequestHumanReviewAsync(
request.Request.DataAs<QualityReport>()
);
await run.Run.SendResponseAsync(
request.Request.CreateResponse(decision)
);
break;
case WorkflowOutputEvent output:
result = output.As<PublishResult>();
Console.WriteLine($"✅ Document published: {result!.Url}");
break;
case WorkflowErrorEvent error:
Console.WriteLine($"❌ Error: {error.Message}");
throw new Exception(error.Message);
}
}
return result ?? throw new InvalidOperationException("Workflow did not produce output");
}
private async Task<ReviewDecision> RequestHumanReviewAsync(QualityReport report)
{
Console.WriteLine($"\n⚠️ Human review required!");
Console.WriteLine($"Quality Score: {report.Score}/100");
Console.WriteLine($"Feedback: {report.Feedback}");
Console.Write("Approve? (y/n): ");
var input = Console.ReadLine();
return new ReviewDecision
{
Approved = input?.ToLower() == "y",
ReviewerComments = input?.ToLower() == "y"
? "Approved by reviewer"
: "Rejected - needs improvement"
};
}
}
8.4 性能分析
让我们对比一下不同实现方式的性能:
方案 1:顺序执行
内容提取(2s) → 情感分析(3s) → 关键词提取(3s) → 摘要生成(4s) → 技术栈识别(3s) → 质量评估(2s)
总耗时:17 秒
方案 2:并行分析(我们的实现)
内容提取(2s) → [并行: 情感(3s), 关键词(3s), 摘要(4s), 技术栈(3s)] → 质量评估(2s)
总耗时:8 秒 (节省 53%)
方案 3:完全并行(包括质量评估)
内容提取(2s) → [并行: 所有分析 + 质量评估]
总耗时:6 秒 (节省 65%)
但可能导致质量评估不准确,因为它需要完整的分析结果
8.5 扩展性考虑
这个系统可以轻松扩展:
1. 添加新的分析维度
var plagiarismAgent = new ChatClientAgent(client, "Detect plagiarism");
builder.AddFanOutEdge(extractor, [/* existing agents */, plagiarismAgent]);
builder.AddFanInEdge(aggregator, [/* existing agents */, plagiarismAgent]);
2. 支持多语言
var languageDetector = new LanguageDetectorExecutor();
builder.AddEdge(extractor, languageDetector);
// 根据语言选择不同的分析 Agent
builder.AddEdge<DetectedLanguage>(
languageDetector,
englishAnalysisAgent,
condition: lang => lang.Code == "en"
);
builder.AddEdge<DetectedLanguage>(
languageDetector,
chineseAnalysisAgent,
condition: lang => lang.Code == "zh"
);
3. 集成外部服务
public class ExternalAPIExecutor : Executor<string, APIResult>
{
private readonly HttpClient _httpClient;
public override async ValueTask<APIResult> HandleAsync(
string data,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
var response = await _httpClient.PostAsJsonAsync(
"https://api.example.com/analyze",
data,
cancellationToken
);
return await response.Content.ReadFromJsonAsync<APIResult>(cancellationToken);
}
}
九、可观测性:让 Workflow 透明化
9.1 内置的事件系统
Agent Framework 提供了丰富的事件类型:
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case WorkflowStartedEvent started:
Console.WriteLine($"🚀 Workflow started: {started.WorkflowId}");
break;
case ExecutorInvokedEvent invoked:
Console.WriteLine($"▶️ Invoking {invoked.ExecutorId}");
break;
case ExecutorCompletedEvent completed:
Console.WriteLine($"✅ {completed.ExecutorId} completed in {completed.Duration}ms");
break;
case ExecutorFailedEvent failed:
Console.WriteLine($"❌ {failed.ExecutorId} failed: {failed.Exception.Message}");
break;
case SuperStepStartedEvent stepStarted:
Console.WriteLine($"⚡ Super Step {stepStarted.StartInfo.SuperStepId} started");
break;
case SuperStepCompletedEvent stepCompleted:
Console.WriteLine($"⚡ Super Step completed with {stepCompleted.CompletionInfo!.ExecutorCount} executors");
break;
case AgentRunUpdateEvent agentUpdate:
Console.Write(agentUpdate.Update.Text); // 实时流式输出
break;
case WorkflowOutputEvent output:
Console.WriteLine($"📤 Output: {output.Data}");
break;
}
}
9.2 集成 OpenTelemetry
Agent Framework 原生支持 OpenTelemetry:
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService("DocumentProcessingService"))
.AddSource("Microsoft.Agents.AI.Workflows")
.AddConsoleExporter()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://localhost:4317");
})
.Build();
// 执行 Workflow,所有操作都会自动记录到 OpenTelemetry
await using var run = await InProcessExecution.RunAsync(workflow, input);
9.3 自定义指标收集
public class MetricsCollectorExecutor : Executor<object>
{
private static readonly Counter<long> s_messageCounter =
Meter.CreateCounter<long>("workflow.messages.processed");
private static readonly Histogram<double> s_processingTime =
Meter.CreateHistogram<double>("workflow.processing.duration");
public override async ValueTask HandleAsync(
object message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
try
{
await ProcessMessageAsync(message, cancellationToken);
s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "success"));
}
catch (Exception ex)
{
s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "failure"));
throw;
}
finally
{
stopwatch.Stop();
s_processingTime.Record(stopwatch.ElapsedMilliseconds);
}
}
}
十、Workflow 的高级特性与技巧
10.1 子工作流:模块化的艺术
当 Workflow 变得复杂时,我们需要将其拆分成更小的、可复用的单元。这就是子工作流的用武之地。
10.1.1 为什么需要子工作流?
想象你在构建一个电商系统,订单处理流程可能包括:
-
库存检查
-
支付处理
-
物流安排
-
通知发送
每个环节本身就是一个复杂的流程。如果全部写在一个 Workflow 里,会变成"意大利面代码"。
10.1.2 子工作流的实现
// 定义一个支付处理子工作流
public class PaymentWorkflow
{
public static Workflow Build()
{
var validator = new PaymentValidatorExecutor();
var processor = new PaymentProcessorExecutor();
var notifier = new PaymentNotifierExecutor();
return new WorkflowBuilder(validator)
.AddEdge(validator, processor)
.AddEdge(processor, notifier)
.WithOutputFrom(notifier)
.Build();
}
}
// 在主工作流中使用子工作流
public class OrderWorkflow
{
public static Workflow Build()
{
var inventoryCheck = new InventoryCheckExecutor();
// 将子工作流包装成 Executor
var paymentWorkflow = new WorkflowHostExecutor(
PaymentWorkflow.Build(),
"PaymentSubWorkflow"
);
var shipping = new ShippingExecutor();
return new WorkflowBuilder(inventoryCheck)
.AddEdge(inventoryCheck, paymentWorkflow)
.AddEdge(paymentWorkflow, shipping)
.WithOutputFrom(shipping)
.Build();
}
}
子工作流的优势:
-
模块化:每个子工作流可以独立开发和测试
-
复用性:支付流程可以在多个地方使用
-
可维护性:修改支付逻辑不影响其他部分
-
可测试性:可以单独测试子工作流
10.1.3 子工作流的通信
子工作流不是黑盒,它可以与父工作流通信:
public class SubworkflowWithEventsExecutor : Executor<OrderData>
{
private readonly Workflow _subworkflow;
public override async ValueTask HandleAsync(
OrderData order,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
await using var subRun = await InProcessExecution.StreamAsync(
_subworkflow,
order
);
await foreach (WorkflowEvent evt in subRun.WatchStreamAsync())
{
// 将子工作流的事件转发到父工作流
if (evt is SubworkflowWarningEvent warning)
{
await context.AddEventAsync(
new WorkflowWarningEvent($"[SubWorkflow] {warning.Message}"),
cancellationToken
);
}
else if (evt is WorkflowOutputEvent output)
{
await context.SendMessageAsync(output.Data, cancellationToken: cancellationToken);
}
}
}
}
10.2 动态工作流:运行时构建
有时候,Workflow 的结构需要根据运行时的数据来决定。比如:
-
根据用户权限决定审批流程
-
根据订单金额决定是否需要额外验证
-
根据文档类型选择不同的处理流程
public class DynamicWorkflowBuilder
{
public static Workflow BuildForUser(User user, IChatClient client)
{
var start = new StartExecutor();
var builder = new WorkflowBuilder(start);
// 根据用户角色动态添加步骤
if (user.Role == "Admin")
{
var adminAgent = new ChatClientAgent(client, "Admin assistant");
builder.AddEdge(start, adminAgent);
}
else if (user.Role == "Manager")
{
var managerAgent = new ChatClientAgent(client, "Manager assistant");
var approvalPort = RequestPort.Create<ApprovalRequest, bool>("ManagerApproval");
builder.AddEdge(start, managerAgent);
builder.AddEdge(managerAgent, approvalPort);
}
else
{
var basicAgent = new ChatClientAgent(client, "Basic assistant");
builder.AddEdge(start, basicAgent);
}
return builder.Build();
}
}
10.3 循环与迭代
有些场景需要重复执行某些步骤,直到满足条件为止。
10.3.1 简单循环
public class LoopWorkflow
{
public static Workflow Build()
{
var processor = new DataProcessorExecutor();
var validator = new ValidatorExecutor();
var builder = new WorkflowBuilder(processor);
// 如果验证失败,回到处理器重新处理
builder.AddEdge<ValidationResult>(
validator,
processor,
condition: result => !result.IsValid
);
// 如果验证成功,输出结果
builder.AddEdge<ValidationResult>(
validator,
new OutputExecutor(),
condition: result => result.IsValid
);
return builder.Build();
}
}
10.3.2 带计数器的循环
public class RetryExecutor : StatefulExecutor<string, string>
{
private int _attemptCount = 0;
private const int MaxAttempts = 3;
public RetryExecutor() : base("RetryExecutor",
new StatefulExecutorOptions { StateScope = StateScope.Run })
{ }
public override async ValueTask<string> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
_attemptCount++;
try
{
return await ProcessWithRetry(message, cancellationToken);
}
catch (Exception ex) when (_attemptCount < MaxAttempts)
{
await context.AddEventAsync(
new WorkflowWarningEvent($"Attempt {_attemptCount} failed, retrying..."),
cancellationToken
);
// 发送消息给自己,触发重试
await context.SendMessageAsync(message, cancellationToken: cancellationToken);
return string.Empty; // 临时返回
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"Failed after {MaxAttempts} attempts", ex
);
}
}
}
10.4 条件路由的高级用法
10.4.1 Switch-Case 模式
public class RouterExecutor : Executor<Message, RoutingDecision>
{
public override ValueTask<RoutingDecision> HandleAsync(
Message message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
var decision = message.Type switch
{
"urgent" => new RoutingDecision { Target = "UrgentHandler", Priority = 1 },
"normal" => new RoutingDecision { Target = "NormalHandler", Priority = 2 },
"low" => new RoutingDecision { Target = "LowPriorityHandler", Priority = 3 },
_ => new RoutingDecision { Target = "DefaultHandler", Priority = 4 }
};
return ValueTask.FromResult(decision);
}
}
// 在 WorkflowBuilder 中使用
var router = new RouterExecutor();
var urgentHandler = new UrgentHandlerExecutor();
var normalHandler = new NormalHandlerExecutor();
var lowHandler = new LowPriorityHandlerExecutor();
builder.AddEdge<RoutingDecision>(router, urgentHandler,
condition: d => d.Target == "UrgentHandler");
builder.AddEdge<RoutingDecision>(router, normalHandler,
condition: d => d.Target == "NormalHandler");
builder.AddEdge<RoutingDecision>(router, lowHandler,
condition: d => d.Target == "LowPriorityHandler");
10.4.2 多条件选择
有时候一个消息需要同时发送到多个目标:
// 使用 Fan-Out 的 partitioner 实现多选
builder.AddFanOutEdge<Message>(
source,
partitioner: (message, count) =>
{
var targets = new List<int>();
if (message.RequiresLogging)
targets.Add(0); // Logger
if (message.RequiresNotification)
targets.Add(1); // Notifier
if (message.RequiresArchiving)
targets.Add(2); // Archiver
return targets;
},
targets: [logger, notifier, archiver]
);
十一、生产环境部署考虑
11.1 容错与恢复
11.1.1 Checkpoint 持久化
在生产环境中,内存中的 Checkpoint 是不够的,我们需要持久化存储:
public class DatabaseCheckpointStore : ICheckpointStore
{
private readonly DbContext _dbContext;
public async Task SaveCheckpointAsync(
string checkpointId,
byte[] data,
CancellationToken cancellationToken)
{
var checkpoint = new CheckpointEntity
{
Id = checkpointId,
Data = data,
CreatedAt = DateTime.UtcNow
};
_dbContext.Checkpoints.Add(checkpoint);
await _dbContext.SaveChangesAsync(cancellationToken);
}
public async Task<byte[]?> LoadCheckpointAsync(
string checkpointId,
CancellationToken cancellationToken)
{
var checkpoint = await _dbContext.Checkpoints
.FirstOrDefaultAsync(c => c.Id == checkpointId, cancellationToken);
return checkpoint?.Data;
}
}
// 使用自定义的 Checkpoint Store
var checkpointManager = new CheckpointManager(
new DatabaseCheckpointStore(dbContext)
);
11.1.2 分布式 Checkpoint
对于分布式系统,可以使用 Redis 或 Azure Blob Storage:
public class RedisCheckpointStore : ICheckpointStore
{
private readonly IConnectionMultiplexer _redis;
public async Task SaveCheckpointAsync(
string checkpointId,
byte[] data,
CancellationToken cancellationToken)
{
var db = _redis.GetDatabase();
await db.StringSetAsync(
$"checkpoint:{checkpointId}",
data,
expiry: TimeSpan.FromDays(7) // 7 天后自动过期
);
}
public async Task<byte[]?> LoadCheckpointAsync(
string checkpointId,
CancellationToken cancellationToken)
{
var db = _redis.GetDatabase();
var data = await db.StringGetAsync($"checkpoint:{checkpointId}");
return data.HasValue ? (byte[])data : null;
}
}
11.2 性能监控
11.2.1 集成 Application Insights
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;
public class MonitoredWorkflowExecutor : Executor<string>
{
private readonly TelemetryClient _telemetry;
public MonitoredWorkflowExecutor(TelemetryClient telemetry)
: base("MonitoredExecutor")
{
_telemetry = telemetry;
}
public override async ValueTask HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
using var operation = _telemetry.StartOperation<RequestTelemetry>("ProcessMessage");
operation.Telemetry.Properties["MessageLength"] = message.Length.ToString();
try
{
await ProcessMessageAsync(message, cancellationToken);
operation.Telemetry.Success = true;
}
catch (Exception ex)
{
operation.Telemetry.Success = false;
_telemetry.TrackException(ex);
throw;
}
}
}
11.2.2 自定义健康检查
public class WorkflowHealthCheck : IHealthCheck
{
private readonly Workflow _workflow;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// 执行一个简单的测试 Workflow
await using var run = await InProcessExecution.RunAsync(
_workflow,
"health-check",
cancellationToken: cancellationToken
);
var completed = false;
await foreach (var evt in run.NewEvents)
{
if (evt is WorkflowOutputEvent)
{
completed = true;
break;
}
}
return completed
? HealthCheckResult.Healthy("Workflow is operational")
: HealthCheckResult.Degraded("Workflow did not complete");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Workflow failed", ex);
}
}
}
// 在 Startup.cs 中注册
services.AddHealthChecks()
.AddCheck<WorkflowHealthCheck>("workflow");
11.3 安全性考虑
11.3.1 输入验证
public class SecureExecutor : Executor<UserInput, ProcessedData>
{
public override async ValueTask<ProcessedData> HandleAsync(
UserInput input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 验证输入
if (string.IsNullOrWhiteSpace(input.Data))
{
throw new ArgumentException("Input data cannot be empty");
}
// 检查输入长度
if (input.Data.Length > 10000)
{
throw new ArgumentException("Input data exceeds maximum length");
}
// 清理潜在的恶意内容
var sanitized = SanitizeInput(input.Data);
return await ProcessSecurely(sanitized, cancellationToken);
}
private string SanitizeInput(string input)
{
// 移除 HTML 标签、SQL 注入尝试等
return System.Net.WebUtility.HtmlEncode(input);
}
}
11.3.2 敏感数据处理
public class SecureDataExecutor : Executor<SensitiveData>
{
private readonly IDataProtector _protector;
public SecureDataExecutor(IDataProtectionProvider provider)
: base("SecureDataExecutor")
{
_protector = provider.CreateProtector("WorkflowDataProtection");
}
protected internal override async ValueTask OnCheckpointingAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 在保存 Checkpoint 前加密敏感数据
if (_sensitiveData != null)
{
_encryptedData = _protector.Protect(_sensitiveData);
_sensitiveData = null; // 清除明文
}
}
protected internal override async ValueTask OnCheckpointRestoredAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 从 Checkpoint 恢复后解密数据
if (_encryptedData != null)
{
_sensitiveData = _protector.Unprotect(_encryptedData);
}
}
}
11.4 扩展性设计
11.4.1 水平扩展
对于高负载场景,可以将 Workflow 部署到多个实例:
public class DistributedWorkflowService
{
private readonly IMessageQueue _queue;
private readonly Workflow _workflow;
public async Task EnqueueWorkflowAsync(WorkflowRequest request)
{
// 将请求放入队列
await _queue.PublishAsync("workflow-queue", request);
}
public async Task ProcessWorkflowsAsync(CancellationToken cancellationToken)
{
// 多个实例可以并行处理队列中的请求
await foreach (var request in _queue.SubscribeAsync("workflow-queue", cancellationToken))
{
try
{
await using var run = await InProcessExecution.RunAsync(
_workflow,
request.Input,
cancellationToken: cancellationToken
);
// 处理结果...
}
catch (Exception ex)
{
// 错误处理和重试逻辑
await HandleErrorAsync(request, ex);
}
}
}
}
11.4.2 资源限制
public class ThrottledWorkflowService
{
private readonly SemaphoreSlim _semaphore;
private readonly Workflow _workflow;
public ThrottledWorkflowService(Workflow workflow, int maxConcurrency = 10)
{
_workflow = workflow;
_semaphore = new SemaphoreSlim(maxConcurrency);
}
public async Task<TOutput> ExecuteAsync<TInput, TOutput>(
TInput input,
CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
await using var run = await InProcessExecution.RunAsync(
_workflow,
input,
cancellationToken: cancellationToken
);
// 提取输出...
return default!;
}
finally
{
_semaphore.Release();
}
}
}
十二、未来展望与趋势
12.1 AI 驱动的工作流优化
未来的 Workflow 系统可能会使用 AI 来自动优化执行路径:
// 概念性代码 - 未来可能的功能
public class AIOptimizedWorkflow
{
private readonly Workflow _workflow;
private readonly AIOptimizer _optimizer;
public async Task<Workflow> OptimizeAsync()
{
// AI 分析历史执行数据
var executionHistory = await LoadExecutionHistoryAsync();
// 识别瓶颈和优化机会
var insights = await _optimizer.AnalyzeAsync(executionHistory);
// 自动重构 Workflow
if (insights.SuggestParallelization)
{
return RebuildWithParallelism(_workflow, insights.ParallelizableSteps);
}
return _workflow;
}
}
12.2 低代码/无代码平台集成
声明式 Workflow 为低代码平台铺平了道路:
[可视化设计器]
↓ 导出
[JSON Workflow 定义]
↓ 加载
[Agent Framework 执行]
未来可能会有:
-
拖拽式 Workflow 设计器
-
实时预览和调试
-
模板市场(预定义的 Workflow 模板)
-
协作编辑(多人同时设计 Workflow)
12.3 边缘计算与 Workflow
随着边缘计算的发展,Workflow 可能会在边缘设备上运行:
// IoT 设备上的轻量级 Workflow
public class EdgeWorkflow
{
public static Workflow BuildForEdge()
{
var sensor = new SensorDataExecutor();
var filter = new DataFilterExecutor();
var localAnalysis = new LocalAnalysisExecutor();
var cloudSync = new CloudSyncExecutor();
return new WorkflowBuilder(sensor)
.AddEdge(sensor, filter)
.AddEdge<SensorData>(
filter,
localAnalysis,
condition: data => data.IsNormal // 正常数据本地处理
)
.AddEdge<SensorData>(
filter,
cloudSync,
condition: data => !data.IsNormal // 异常数据上传云端
)
.Build();
}
}
12.4 多模态 Workflow
未来的 Workflow 将支持更多模态的数据处理:
public class MultiModalWorkflow
{
public static Workflow Build(IChatClient client)
{
var imageAnalyzer = new ImageAnalysisAgent(client);
var audioTranscriber = new AudioTranscriptionAgent(client);
var textAnalyzer = new TextAnalysisAgent(client);
var videoProcessor = new VideoProcessingAgent(client);
var synthesizer = new MultiModalSynthesisAgent(client);
return new WorkflowBuilder(new InputRouterExecutor())
.AddFanOutEdge(
router,
targets: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor]
)
.AddFanInEdge(synthesizer,
sources: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor])
.WithOutputFrom(synthesizer)
.Build();
}
}
更多推荐





所有评论(0)