"再聪明的 AI,也有需要人类拍板的时候。"

引子:一个真实的困境

想象这样一个场景:你的 AI 客服系统正在处理一个客户的退款申请。系统已经自动分析了订单信息、物流状态、用户历史记录,甚至通过情感分析判断出客户情绪有些激动。但就在最后一步——是否批准这笔超出常规金额的退款时,AI 犹豫了。

这不是 AI 的能力问题,而是责任边界的问题。某些决策,天然需要人类的判断、经验和担当。

这就是我们今天要聊的话题:在自动化的 AI 工作流中,如何优雅地让人类接管控制权?

一、为什么工作流需要"人工接入"?

1.1 自动化的边界

AI 工作流的魅力在于自动化,但自动化并非万能。在企业级应用中,我们经常遇到这些场景:

  • 高风险决策:涉及大额资金、法律责任的操作

  • 异常情况处理:超出预设规则的边缘案例

  • 质量把关:内容审核、创意评估等主观判断

  • 合规要求:某些行业明确要求"人在回路"(Human-in-the-Loop)

用一个比喻来说:AI 工作流就像高速公路上的自动驾驶,大部分时候可以放心交给系统。但遇到复杂路况、突发事件,还是需要人类司机接管方向盘。

1.2 两种人工接入模式

Microsoft Agent Framework 提供了两种截然不同的人工接入方式,分别适用于不同的场景:

模式一:RequestPort(请求端口)

  • 适用于:编程式工作流(Programmatic Workflows)

  • 特点:灵活、强类型、完全可控

  • 场景:复杂业务逻辑、需要精确控制流程的场合

模式二:Question Action(问答动作)

  • 适用于:声明式工作流(Declarative Workflows)

  • 特点:配置化、低代码、快速部署

  • 场景:标准化流程、业务人员可配置的场景

接下来,我们深入剖析这两种模式的实现原理和最佳实践。

二、RequestPort:编程式工作流的"暂停按钮"

2.1 核心概念解析

RequestPort 的设计哲学很有意思:它把人工接入抽象成了一个"特殊的执行器"。在工作流的视角里,人类和 AI Agent 没有本质区别——都是接收请求、返回响应的处理单元。

让我们看看核心数据结构:

// RequestPort 定义了一个"请求-响应"契约
public record RequestPort<TRequest, TResponse>(
    string Id,           // 端口唯一标识
    Type Request,        // 请求类型
    Type Response,       // 响应类型
    bool AllowWrapped    // 是否允许类型包装
) : RequestPort(Id, Request, Response);

这个设计的精妙之处在于:通过泛型约束,在编译期就确保了类型安全。你不可能把一个 int 类型的响应发送给期望 string 的请求端口。

2.2 工作流程深度剖析

RequestPort 的执行流程涉及两个"超级步骤"(SuperStep):

第一步:发出请求

// 工作流执行到 RequestPort 时
RequestPort numberRequest = RequestPort.Create<NumberSignal, int>("GuessNumber");

// 系统自动生成 RequestInfoEvent
public sealed class RequestInfoEvent(ExternalRequest request) : WorkflowEvent(request)
{
    public ExternalRequest Request => request;
}

这个事件会被发送到"外部世界"——也就是你的应用程序。此时工作流进入等待状态,就像一个暂停的视频。

第二步:接收响应

// 外部系统处理请求
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    if (request.DataIs<NumberSignal>())
    {
        int userGuess = ReadIntegerFromConsole("请输入你的猜测: ");
        return request.CreateResponse(userGuess);
    }
    throw new NotSupportedException($"不支持的请求类型");
}

// 将响应发送回工作流
await handle.SendResponseAsync(response);

工作流收到响应后,从暂停点恢复执行,就像视频继续播放。

2.3 实战案例:数字猜谜游戏

让我们通过一个完整的例子来理解 RequestPort 的威力:

internal static class WorkflowFactory
{
    internal static Workflow BuildWorkflow()
    {
        // 创建请求端口:请求类型是 NumberSignal,响应类型是 int
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        
        // 创建判断执行器
        JudgeExecutor judgeExecutor = new(targetNumber: 42);

        // 构建循环工作流
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)  // 用户输入 -> 判断
            .AddEdge(judgeExecutor, numberRequestPort)  // 判断结果 -> 再次请求输入
            .WithOutputFrom(judgeExecutor)
            .Build();
    }
}

这个工作流的巧妙之处在于:它形成了一个人机交互的闭环

// 判断执行器的实现
internal sealed class JudgeExecutor(int targetNumber) : Executor<int>("Judge")
{
    private int _tries;

    public override async ValueTask HandleAsync(
        int message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        this._tries++;
        
        if (message == targetNumber)
        {
            // 猜对了,输出结果并结束
            await context.YieldOutputAsync(
                $"恭喜!{targetNumber} 在 {_tries} 次尝试后被找到!", 
                cancellationToken);
        }
        else if (message < targetNumber)
        {
            // 太小了,发送信号继续循环
            await context.SendMessageAsync(
                NumberSignal.Below, 
                cancellationToken: cancellationToken);
        }
        else
        {
            // 太大了,发送信号继续循环
            await context.SendMessageAsync(
                NumberSignal.Above, 
                cancellationToken: cancellationToken);
        }
    }
}

2.4 设计亮点分析

亮点一:类型安全的异步通信

传统的人工接入往往依赖字符串、JSON 等弱类型方式。RequestPort 通过泛型实现了强类型约束:

// ExternalRequest 提供了类型安全的访问方法
public TValue? DataAs<TValue>() => this.Data.As<TValue>();

public bool DataIs<TValue>() => this.Data.Is<TValue>();

// 使用时编译器会检查类型匹配
if (request.DataIs<NumberSignal>(out var signal))
{
    // 这里 signal 已经是强类型的 NumberSignal
    switch (signal)
    {
        case NumberSignal.Init:
            // 处理初始化信号
            break;
    }
}

亮点二:请求-响应的强关联

每个请求都有唯一的 RequestId,响应必须匹配对应的请求:

public record ExternalRequest(
    RequestPortInfo PortInfo,
    string RequestId,      // 唯一标识
    PortableValue Data
)
{
    // 创建响应时自动关联 RequestId
    public ExternalResponse CreateResponse<T>(T data) 
        => new ExternalResponse(this.PortInfo, this.RequestId, new PortableValue(data));
}

这种设计避免了"响应错位"的问题——即使有多个并发请求,也能准确匹配。

亮点三:与 Checkpoint 的完美结合

RequestPort 天然支持工作流的检查点(Checkpoint)机制:

// 每个 SuperStep 结束时自动创建检查点
case SuperStepCompletedEvent superStepCompletedEvt:
    CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
    if (checkpoint is not null)
    {
        checkpoints.Add(checkpoint);
        Console.WriteLine($"检查点已创建于步骤 {checkpoints.Count}");
    }
    break;

// 可以从任意检查点恢复
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);

这意味着:即使系统崩溃,也可以从人工接入的断点恢复,不会丢失用户输入

三、Question Action:声明式工作流的"人机对话"

3.1 从配置到执行

如果说 RequestPort 是给开发者用的"手术刀",那么 Question Action 就是给业务人员用的"瑞士军刀"。它通过声明式配置实现人工接入,无需编写代码。

Question Action 的核心是 QuestionExecutor,它实现了一个完整的问答流程:

internal sealed class QuestionExecutor(
    Question model,                    // 问题配置模型
    WorkflowAgentProvider agentProvider,  // Agent 提供者
    WorkflowFormulaState state         // 公式状态
) : DeclarativeActionExecutor<Question>(model, state)

3.2 三阶段执行模型

Question Action 的执行分为三个精心设计的阶段:

阶段一:Prepare(准备)

protected override async ValueTask<object?> ExecuteAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 初始化提示计数
    await this._promptCount.WriteAsync(context, 0).ConfigureAwait(false);

    // 检查变量是否已有值
    InitializablePropertyPath variable = Throw.IfNull(this.Model.Variable);
    bool hasValue = context.ReadState(variable.Path) is BlankValue;
    
    // 根据 SkipQuestionMode 决定是否跳过提问
    SkipQuestionMode mode = this.Evaluator.GetValue(this.Model.SkipQuestionMode).Value;
    bool proceed = mode switch
    {
        SkipQuestionMode.SkipOnFirstExecutionIfVariableHasValue 
            => !await this._hasExecuted.ReadAsync(context),
        SkipQuestionMode.AlwaysSkipIfVariableHasValue 
            => hasValue,
        SkipQuestionMode.AlwaysAsk 
            => true,
        _ => true,
    };

    if (proceed)
    {
        await this.PromptAsync(context, cancellationToken);
    }
}

这个设计很聪明:它允许工作流"记住"之前的回答,避免重复提问

阶段二:Input(输入)

public async ValueTask PrepareResponseAsync(
    IWorkflowContext context, 
    ActionExecutorResult message, 
    CancellationToken cancellationToken)
{
    int count = await this._promptCount.ReadAsync(context);
    
    // 格式化提示信息
    AnswerRequest inputRequest = new(this.FormatPrompt(this.Model.Prompt));
    
    // 发送到外部系统
    await context.SendMessageAsync(inputRequest, cancellationToken);
    
    // 更新提示计数
    await this._promptCount.WriteAsync(context, count + 1);
}

阶段三:Capture(捕获)

public async ValueTask CaptureResponseAsync(
    IWorkflowContext context, 
    AnswerResponse message, 
    CancellationToken cancellationToken)
{
    FormulaValue? extractedValue = null;
    
    if (message.Value is null)
    {
        // 无法识别的响应
        string unrecognizedResponse = this.FormatPrompt(this.Model.UnrecognizedPrompt);
        await context.AddEventAsync(
            new MessageActivityEvent(unrecognizedResponse.Trim()), 
            cancellationToken);
    }
    else
    {
        // 实体提取和验证
        EntityExtractionResult entityResult = EntityExtractor.Parse(
            this.Model.Entity, 
            message.Value.Text);
            
        if (entityResult.IsValid)
        {
            extractedValue = entityResult.Value;
        }
        else
        {
            // 无效响应
            string invalidResponse = this.Model.InvalidPrompt is not null 
                ? this.FormatPrompt(this.Model.InvalidPrompt) 
                : "无效响应";
            await context.AddEventAsync(
                new MessageActivityEvent(invalidResponse.Trim()), 
                cancellationToken);
        }
    }

    if (extractedValue is null)
    {
        // 重新提示
        await this.PromptAsync(context, cancellationToken);
    }
    else
    {
        // 保存到变量
        await this.AssignAsync(this.Model.Variable?.Path, extractedValue, context);
        await this._hasExecuted.WriteAsync(context, true);
        await context.SendResultMessageAsync(this.Id, cancellationToken);
    }
}

3.3 智能重试机制

Question Action 内置了优雅的重试逻辑:

private async ValueTask PromptAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken)
{
    long repeatCount = this.Evaluator.GetValue(this.Model.RepeatCount).Value;
    int actualCount = await this._promptCount.ReadAsync(context);
    
    if (actualCount >= repeatCount)
    {
        // 达到最大重试次数,使用默认值
        ValueExpression defaultValueExpression = Throw.IfNull(this.Model.DefaultValue);
        DataValue defaultValue = this.Evaluator.GetValue(defaultValueExpression).Value;
        
        await this.AssignAsync(
            this.Model.Variable?.Path, 
            defaultValue.ToFormula(), 
            context);
            
        string defaultValueResponse = this.FormatPrompt(this.Model.DefaultValueResponse);
        await context.AddEventAsync(
            new MessageActivityEvent(defaultValueResponse.Trim()), 
            cancellationToken);
            
        await context.SendResultMessageAsync(this.Id, cancellationToken);
    }
    else
    {
        // 继续等待用户输入
        await context.SendResultMessageAsync(this.Id, result: true, cancellationToken);
    }
}

这个设计体现了容错性思维:如果用户多次输入无效,系统不会死锁,而是使用预设的默认值继续执行。

3.4 实体提取:从文本到结构化数据

Question Action 的一个强大功能是实体提取(Entity Extraction):

// 从用户输入中提取结构化信息
EntityExtractionResult entityResult = EntityExtractor.Parse(
    this.Model.Entity,    // 实体定义(如:日期、数字、邮箱)
    message.Value.Text    // 用户输入的文本
);

if (entityResult.IsValid)
{
    // 提取成功,转换为 PowerFx 公式值
    extractedValue = entityResult.Value;
}

这意味着你可以配置工作流识别特定类型的输入:

  • 日期时间:"明天下午3点" → DateTime(2024-11-12 15:00:00)

  • 数字:"一百二十三" → 123

  • 邮箱:"user@example.com" → 验证格式并提取

四、两种模式的对比与选择

4.1 技术特性对比

特性 RequestPort Question Action
适用场景 编程式工作流 声明式工作流
配置方式 代码定义 JSON/配置文件
类型安全 编译期强类型 运行时类型检查
灵活性 极高,完全可控 中等,受配置限制
学习曲线 陡峭,需要编程知识 平缓,业务人员可用
实体提取 需自行实现 内置支持
重试机制 需自行实现 内置支持
状态管理 手动管理 自动管理

4.2 应用场景建议

选择 RequestPort 的场景:

  1. 复杂业务逻辑

    • 需要根据上下文动态决定请求内容

    • 涉及多轮复杂交互

    • 需要精确控制流程分支

  2. 高性能要求

    • 需要最小化运行时开销

    • 对类型安全有严格要求

    • 需要编译期错误检查

  3. 开发者主导

    • 团队具备较强编程能力

    • 需要深度定制化

    • 与现有代码库深度集成

选择 Question Action 的场景:

  1. 标准化流程

    • 问答流程相对固定

    • 需要快速配置和部署

    • 业务规则频繁变化

  2. 低代码需求

    • 业务人员需要自主配置

    • 减少开发依赖

    • 快速原型验证

  3. 内置功能优先

    • 需要实体提取能力

    • 需要自动重试机制

    • 需要多语言支持

五、实战技巧与最佳实践

5.1 RequestPort 的高级用法

技巧一:使用枚举信号控制流程

// 定义清晰的信号枚举
internal enum ApprovalSignal
{
    PendingReview,      // 待审核
    RequestMoreInfo,    // 需要更多信息
    Approved,           // 已批准
    Rejected            // 已拒绝
}

// 使用信号驱动工作流
RequestPort approvalPort = RequestPort.Create<ApprovalSignal, ApprovalDecision>("Approval");

// 在执行器中根据信号做出不同响应
public override async ValueTask HandleAsync(
    ApprovalDecision decision, 
    IWorkflowContext context, 
    CancellationToken cancellationToken)
{
    switch (decision.Signal)
    {
        case ApprovalSignal.Approved:
            await context.SendMessageAsync(ProcessApproval(decision), cancellationToken);
            break;
        case ApprovalSignal.RequestMoreInfo:
            await context.SendMessageAsync(ApprovalSignal.PendingReview, cancellationToken);
            break;
        case ApprovalSignal.Rejected:
            await context.YieldOutputAsync($"申请被拒绝:{decision.Reason}", cancellationToken);
            break;
    }
}

技巧二:携带上下文信息

// 定义富信息的请求类型
internal sealed class ApprovalRequest
{
    public string RequestId { get; init; }
    public decimal Amount { get; init; }
    public string Reason { get; init; }
    public Dictionary<string, object> Context { get; init; }
}

// 在外部处理时可以访问完整上下文
private static ExternalResponse HandleApprovalRequest(ExternalRequest request)
{
    if (request.DataIs<ApprovalRequest>(out var approvalReq))
    {
        Console.WriteLine($"审批请求 {approvalReq.RequestId}");
        Console.WriteLine($"金额: {approvalReq.Amount:C}");
        Console.WriteLine($"原因: {approvalReq.Reason}");
        
        // 展示上下文信息
        foreach (var kvp in approvalReq.Context)
        {
            Console.WriteLine($"  {kvp.Key}: {kvp.Value}");
        }
        
        // 人工决策
        var decision = GetHumanDecision();
        return request.CreateResponse(decision);
    }
    
    throw new NotSupportedException();
}

技巧三:超时处理

// 在外部处理中实现超时逻辑
private static async Task<ExternalResponse> HandleWithTimeout(
    ExternalRequest request,
    TimeSpan timeout)
{
    using var cts = new CancellationTokenSource(timeout);
    
    try
    {
        // 等待人工输入,但设置超时
        var response = await GetHumanInputAsync(request, cts.Token);
        return request.CreateResponse(response);
    }
    catch (OperationCanceledException)
    {
        // 超时后使用默认决策
        Console.WriteLine("人工审批超时,使用默认策略");
        return request.CreateResponse(GetDefaultDecision());
    }
}

5.2 Question Action 的配置技巧

技巧一:多语言提示

{
  "type": "Question",
  "id": "AskUserName",
  "variable": "userName",
  "prompt": {
    "text": "=If(User.Language = \"zh-CN\", \"请输入您的姓名\", \"Please enter your name\")"
  },
  "entity": "Text",
  "invalidPrompt": {
    "text": "=If(User.Language = \"zh-CN\", \"输入无效,请重试\", \"Invalid input, please try again\")"
  }
}

技巧二:条件跳过

{
  "type": "Question",
  "id": "ConfirmEmail",
  "variable": "emailConfirmed",
  "skipQuestionMode": "AlwaysSkipIfVariableHasValue",
  "prompt": {
    "text": "请确认您的邮箱地址"
  }
}

技巧三:智能默认值

{
  "type": "Question",
  "id": "AskDeliveryDate",
  "variable": "deliveryDate",
  "prompt": {
    "text": "请选择配送日期"
  },
  "entity": "Date",
  "repeatCount": 3,
  "defaultValue": "=DateAdd(Today(), 3, Days)",
  "defaultValueResponse": {
    "text": "未收到有效输入,已自动设置为3天后配送"
  }
}

5.3 通用最佳实践

实践一:清晰的状态管理

// 使用 DurableProperty 管理持久化状态
private readonly DurableProperty<int> _promptCount = new(nameof(_promptCount));
private readonly DurableProperty<bool> _hasExecuted = new(nameof(_hasExecuted));

// 在 Checkpoint 时自动保存
protected override ValueTask OnCheckpointingAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    return context.QueueStateUpdateAsync(StateKey, this._currentState, cancellationToken);
}

// 恢复时自动加载
protected override async ValueTask OnCheckpointRestoredAsync(
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    this._currentState = await context.ReadStateAsync<State>(StateKey, cancellationToken);
}

实践二:优雅的错误处理

await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    try
    {
        switch (evt)
        {
            case RequestInfoEvent requestEvt:
                var response = await HandleExternalRequest(requestEvt.Request);
                await handle.SendResponseAsync(response);
                break;
                
            case WorkflowErrorEvent errorEvt:
                // 记录错误但不中断流程
                _logger.LogError(errorEvt.Exception, "工作流执行错误");
                await NotifyAdministrator(errorEvt);
                break;
                
            case WorkflowOutputEvent outputEvt:
                await ProcessOutput(outputEvt.Data);
                break;
        }
    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "处理工作流事件时发生严重错误");
        // 决定是否继续或中止
        if (IsCriticalError(ex))
        {
            throw;
        }
    }
}

实践三:可观测性设计

// 使用 Activity 追踪人工接入过程
using var activity = ActivitySource.StartActivity("HumanIntervention");
activity?.SetTag("request.id", request.RequestId);
activity?.SetTag("request.type", request.PortInfo.RequestType);
activity?.SetTag("user.id", currentUserId);

var startTime = DateTime.UtcNow;
var response = await GetHumanInput(request);
var duration = DateTime.UtcNow - startTime;

activity?.SetTag("response.duration_ms", duration.TotalMilliseconds);
activity?.SetTag("response.type", response.GetType().Name);

// 记录指标
_metrics.RecordHumanInterventionDuration(duration);
_metrics.IncrementHumanInterventionCount(request.PortInfo.PortId);

六、架构设计的深层思考

6.1 为什么是"端口"而不是"回调"?

很多人第一次看到 RequestPort 会疑惑:为什么不直接用回调函数?

// 假设的回调方式(不推荐)
workflow.OnNeedHumanInput += async (request) => {
    var response = await GetHumanInput(request);
    return response;
};

RequestPort 的设计更优雅,原因有三:

1. 解耦性 端口是工作流定义的一部分,而不是运行时的副作用。这意味着:

  • 工作流的结构在构建时就确定了

  • 可以静态分析工作流的人工接入点

  • 便于可视化和文档生成

2. 可序列化 RequestPort 的状态可以完整序列化到 Checkpoint:

public sealed record RequestPort<TRequest, TResponse>(
    string Id,
    Type Request,
    Type Response,
    bool AllowWrapped
) : RequestPort(Id, Request, Response);

而回调函数无法序列化,这会破坏 Checkpoint 机制。

3. 类型安全 端口在编译期就确定了请求和响应的类型契约,而回调的类型检查往往推迟到运行时。

6.2 SuperStep:工作流的"心跳"

理解 SuperStep 是掌握人工接入的关键。

什么是 SuperStep?

SuperStep 是工作流执行的原子单位。在一个 SuperStep 内:

  1. 所有就绪的执行器并发执行

  2. 所有消息在内存中传递

  3. 状态更新被缓存

SuperStep 结束时:

  1. 状态更新被提交

  2. 创建 Checkpoint(如果启用)

  3. 事件被发送到外部

为什么人工接入需要两个 SuperStep?

SuperStep 1: 工作流 → RequestInfoEvent → 外部系统
           (工作流暂停,等待响应)

SuperStep 2: 外部系统 → ExternalResponse → 工作流
           (工作流恢复执行)

这种设计保证了:

  • 一致性:每个 SuperStep 都是原子的

  • 可恢复性:可以从任意 SuperStep 恢复

  • 可观测性:每个 SuperStep 都有明确的边界

6.3 PortableValue:跨边界的类型桥梁

PortableValue 是一个精妙的设计,它解决了类型在不同上下文间传递的问题:

public record PortableValue(object? Value)
{
    public T? As<T>() => Value is T typed ? typed : default;
    
    public bool Is<T>() => Value is T;
    
    public object? AsType(Type targetType)
    {
        if (Value is null) return null;
        if (targetType.IsInstanceOfType(Value)) return Value;
        
        // 尝试类型转换
        return Convert.ChangeType(Value, targetType);
    }
}

它的作用是:

  1. 类型擦除:在序列化时隐藏具体类型

  2. 类型恢复:在反序列化时恢复类型

  3. 类型转换:在必要时进行安全的类型转换

这使得工作流可以在不同的进程、甚至不同的机器间传递数据。

七、真实场景应用案例

7.1 案例一:智能客服的人工升级

场景描述: 客服 AI 处理用户咨询,当遇到复杂问题或用户情绪激动时,自动转接人工客服。

实现方案

// 定义升级信号
internal enum EscalationSignal
{
    ComplexQuery,      // 复杂查询
    EmotionalUser,     // 情绪化用户
    HighValueCustomer, // 高价值客户
    PolicyException    // 政策例外
}

// 创建人工接入端口
RequestPort humanAgentPort = RequestPort.Create<EscalationContext, AgentResponse>("HumanAgent");

// 构建工作流
return new WorkflowBuilder(aiChatBot)
    .AddEdge(aiChatBot, complexityAnalyzer)
    .AddEdge(complexityAnalyzer, humanAgentPort, 
        condition: result => result.RequiresHuman)
    .AddEdge(humanAgentPort, responseFormatter)
    .WithOutputFrom(responseFormatter)
    .Build();

关键实现

internal sealed class ComplexityAnalyzer : Executor<ChatMessage, AnalysisResult>("Analyzer")
{
    public override async ValueTask<AnalysisResult> HandleAsync(
        ChatMessage message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken)
    {
        // 分析消息复杂度
        var sentiment = await _sentimentAnalyzer.AnalyzeAsync(message.Content);
        var complexity = await _complexityScorer.ScoreAsync(message.Content);
        var customerValue = await _crmService.GetCustomerValueAsync(message.UserId);
        
        bool requiresHuman = 
            sentiment.IsNegative && sentiment.Intensity > 0.7 ||
            complexity.Score > 0.8 ||
            customerValue == CustomerTier.Premium;
            
        return new AnalysisResult
        {
            RequiresHuman = requiresHuman,
            Reason = DetermineReason(sentiment, complexity, customerValue),
            Context = new EscalationContext
            {
                OriginalMessage = message,
                Sentiment = sentiment,
                Complexity = complexity,
                CustomerTier = customerValue
            }
        };
    }
}

人工客服界面

private static async Task<ExternalResponse> HandleHumanAgent(ExternalRequest request)
{
    if (request.DataIs<EscalationContext>(out var context))
    {
        // 展示给人工客服的界面
        Console.WriteLine("=== 人工接入请求 ===");
        Console.WriteLine($"原因: {context.Reason}");
        Console.WriteLine($"客户等级: {context.CustomerTier}");
        Console.WriteLine($"情绪分析: {context.Sentiment}");
        Console.WriteLine($"原始消息: {context.OriginalMessage.Content}");
        Console.WriteLine("==================");
        
        // 人工客服处理
        var agentResponse = await GetAgentResponseFromUI();
        
        return request.CreateResponse(agentResponse);
    }
    
    throw new NotSupportedException();
}

7.2 案例二:内容审核工作流

场景描述: 用户生成内容(UGC)需要经过 AI 初审和人工复审的两级审核。

实现方案

// 使用 Question Action 配置审核流程
{
  "workflow": {
    "name": "ContentModerationWorkflow",
    "actions": [
      {
        "type": "InvokeAzureAgent",
        "id": "AIModeration",
        "agentId": "content-moderator",
        "input": "=Content.Text"
      },
      {
        "type": "ConditionGroup",
        "id": "CheckAIResult",
        "conditions": [
          {
            "condition": "=AIModeration.Result.Confidence < 0.8",
            "actions": [
              {
                "type": "Question",
                "id": "HumanReview",
                "variable": "humanDecision",
                "prompt": {
                  "text": "=Concatenate(\"AI 审核置信度较低 (\", Text(AIModeration.Result.Confidence), \"),请人工复审:\", Content.Text)"
                },
                "entity": "Choice",
                "choices": ["通过", "拒绝", "需要更多信息"],
                "repeatCount": 1,
                "defaultValue": "拒绝",
                "defaultValueResponse": {
                  "text": "审核超时,内容已被自动拒绝"
                }
              }
            ]
          }
        ]
      },
      {
        "type": "SetVariable",
        "id": "FinalDecision",
        "variable": "finalResult",
        "value": "=If(IsBlank(humanDecision), AIModeration.Result.Decision, humanDecision)"
      }
    ]
  }
}

7.3 案例三:财务审批流程

场景描述: 费用报销需要根据金额大小,自动路由到不同级别的审批人。

实现方案

internal static class ExpenseApprovalWorkflow
{
    public static Workflow Build()
    {
        // 创建多级审批端口
        var managerApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("ManagerApproval");
        var directorApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("DirectorApproval");
        var cfoApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("CFOApproval");
        
        var router = new ApprovalRouter();
        var processor = new ExpenseProcessor();
        
        return new WorkflowBuilder(router)
            // 根据金额路由到不同审批人
            .AddEdge(router, managerApproval, 
                condition: req => req.Amount < 1000)
            .AddEdge(router, directorApproval, 
                condition: req => req.Amount >= 1000 && req.Amount < 10000)
            .AddEdge(router, cfoApproval, 
                condition: req => req.Amount >= 10000)
            
            // 所有审批结果汇总到处理器
            .AddFanInEdge([managerApproval, directorApproval, cfoApproval], processor)
            .WithOutputFrom(processor)
            .Build();
    }
}

internal sealed class ApprovalRouter : Executor<ExpenseRequest, ApprovalRequest>("Router")
{
    public override async ValueTask<ApprovalRequest> HandleAsync(
        ExpenseRequest request, 
        IWorkflowContext context, 
        CancellationToken cancellationToken)
    {
        // 丰富审批请求信息
        var approvalRequest = new ApprovalRequest
        {
            RequestId = request.Id,
            Amount = request.Amount,
            Category = request.Category,
            Submitter = request.SubmitterId,
            Reason = request.Reason,
            Attachments = request.Attachments,
            Context = new Dictionary<string, object>
            {
                ["SubmissionDate"] = DateTime.UtcNow,
                ["Department"] = await GetDepartment(request.SubmitterId),
                ["BudgetRemaining"] = await GetBudgetRemaining(request.Category)
            }
        };
        
        return approvalRequest;
    }
}

多级审批处理

private static async Task RunApprovalWorkflow(Workflow workflow)
{
    await using var handle = await InProcessExecution.StreamAsync(
        workflow, 
        new ExpenseRequest { Amount = 5000, Category = "Travel" });
        
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
    {
        switch (evt)
        {
            case RequestInfoEvent requestEvt:
                // 根据端口 ID 路由到不同审批人
                var response = requestEvt.Request.PortInfo.PortId switch
                {
                    "ManagerApproval" => await GetManagerApproval(requestEvt.Request),
                    "DirectorApproval" => await GetDirectorApproval(requestEvt.Request),
                    "CFOApproval" => await GetCFOApproval(requestEvt.Request),
                    _ => throw new NotSupportedException()
                };
                
                await handle.SendResponseAsync(response);
                break;
                
            case WorkflowOutputEvent outputEvt:
                var result = outputEvt.DataAs<ExpenseResult>();
                Console.WriteLine($"审批结果: {result.Status}");
                if (result.Status == ApprovalStatus.Approved)
                {
                    await ProcessPayment(result);
                }
                break;
        }
    }
}

private static async Task<ExternalResponse> GetDirectorApproval(ExternalRequest request)
{
    var approvalReq = request.DataAs<ApprovalRequest>();
    
    // 发送通知给主管
    await _notificationService.NotifyAsync(
        approvalReq.DirectorId, 
        $"待审批: {approvalReq.Amount:C} - {approvalReq.Reason}");
    
    // 等待主管决策(可能来自移动端、Web 端等)
    var decision = await _approvalService.WaitForDecisionAsync(
        approvalReq.RequestId, 
        timeout: TimeSpan.FromHours(24));
    
    return request.CreateResponse(decision);
}

八、性能优化与扩展性

8.1 并发处理

当多个用户同时触发人工接入时,如何保证系统不会崩溃?

策略一:请求队列

internal sealed class HumanRequestQueue
{
    private readonly Channel<PendingRequest> _queue;
    private readonly SemaphoreSlim _concurrencyLimit;
    
    public HumanRequestQueue(int maxConcurrency = 10)
    {
        _queue = Channel.CreateUnbounded<PendingRequest>();
        _concurrencyLimit = new SemaphoreSlim(maxConcurrency);
    }
    
    public async Task<ExternalResponse> EnqueueAsync(
        ExternalRequest request, 
        CancellationToken cancellationToken)
    {
        var tcs = new TaskCompletionSource<ExternalResponse>();
        var pending = new PendingRequest(request, tcs);
        
        await _queue.Writer.WriteAsync(pending, cancellationToken);
        
        return await tcs.Task;
    }
    
    public async Task ProcessQueueAsync(CancellationToken cancellationToken)
    {
        await foreach (var pending in _queue.Reader.ReadAllAsync(cancellationToken))
        {
            await _concurrencyLimit.WaitAsync(cancellationToken);
            
            _ = Task.Run(async () =>
            {
                try
                {
                    var response = await HandleRequest(pending.Request);
                    pending.CompletionSource.SetResult(response);
                }
                finally
                {
                    _concurrencyLimit.Release();
                }
            }, cancellationToken);
        }
    }
}

策略二:优先级调度

internal sealed class PriorityHumanRequestScheduler
{
    private readonly PriorityQueue<PendingRequest, int> _queue = new();
    
    public async Task<ExternalResponse> ScheduleAsync(
        ExternalRequest request,
        int priority,  // 数字越小优先级越高
        CancellationToken cancellationToken)
    {
        var tcs = new TaskCompletionSource<ExternalResponse>();
        var pending = new PendingRequest(request, tcs);
        
        lock (_queue)
        {
            _queue.Enqueue(pending, priority);
        }
        
        return await tcs.Task;
    }
    
    private int CalculatePriority(ExternalRequest request)
    {
        // 根据业务规则计算优先级
        if (request.DataIs<ApprovalRequest>(out var approval))
        {
            return approval.Amount switch
            {
                > 100000 => 1,  // 高优先级
                > 10000 => 2,   // 中优先级
                _ => 3          // 低优先级
            };
        }
        
        return 5; // 默认优先级
    }
}

8.2 分布式部署

在微服务架构中,工作流引擎和人工处理系统可能部署在不同的服务中。

方案一:消息队列集成

internal sealed class MessageQueueRequestHandler : IExternalRequestSink
{
    private readonly IMessageQueue _queue;
    private readonly Dictionary<string, TaskCompletionSource<ExternalResponse>> _pendingRequests = new();
    
    public async Task<ExternalResponse> HandleRequestAsync(
        ExternalRequest request, 
        CancellationToken cancellationToken)
    {
        var tcs = new TaskCompletionSource<ExternalResponse>();
        _pendingRequests[request.RequestId] = tcs;
        
        // 发送到消息队列
        await _queue.PublishAsync(new
        {
            Type = "HumanInterventionRequest",
            RequestId = request.RequestId,
            PortId = request.PortInfo.PortId,
            Data = request.Data
        }, cancellationToken);
        
        return await tcs.Task;
    }
    
    public async Task OnResponseReceivedAsync(string requestId, ExternalResponse response)
    {
        if (_pendingRequests.TryGetValue(requestId, out var tcs))
        {
            tcs.SetResult(response);
            _pendingRequests.Remove(requestId);
        }
    }
}

方案二:SignalR 实时通信

internal sealed class SignalRHumanInterface
{
    private readonly IHubContext<HumanInterventionHub> _hubContext;
    
    public async Task<ExternalResponse> RequestHumanInputAsync(
        ExternalRequest request,
        string userId,
        CancellationToken cancellationToken)
    {
        var tcs = new TaskCompletionSource<ExternalResponse>();
        
        // 注册响应处理器
        HumanInterventionHub.RegisterResponseHandler(request.RequestId, tcs);
        
        // 通过 SignalR 发送到用户的浏览器
        await _hubContext.Clients.User(userId).SendAsync(
            "RequestInput",
            new
            {
                RequestId = request.RequestId,
                PortId = request.PortInfo.PortId,
                Data = request.Data,
                Timestamp = DateTime.UtcNow
            },
            cancellationToken);
        
        // 等待用户响应
        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        timeoutCts.CancelAfter(TimeSpan.FromMinutes(5));
        
        try
        {
            return await tcs.Task.WaitAsync(timeoutCts.Token);
        }
        catch (OperationCanceledException)
        {
            // 超时处理
            return request.CreateResponse(GetDefaultResponse());
        }
    }
}

// SignalR Hub
public class HumanInterventionHub : Hub
{
    private static readonly ConcurrentDictionary<string, TaskCompletionSource<ExternalResponse>> 
        _responseHandlers = new();
    
    public static void RegisterResponseHandler(
        string requestId, 
        TaskCompletionSource<ExternalResponse> tcs)
    {
        _responseHandlers[requestId] = tcs;
    }
    
    public async Task SubmitResponse(string requestId, object responseData)
    {
        if (_responseHandlers.TryRemove(requestId, out var tcs))
        {
            // 构造响应对象
            var response = new ExternalResponse(/* ... */);
            tcs.SetResult(response);
        }
    }
}

8.3 状态持久化

长时间运行的人工接入流程需要可靠的状态持久化。

internal sealed class PersistentHumanRequestManager
{
    private readonly ICheckpointStore _checkpointStore;
    private readonly IDistributedCache _cache;
    
    public async Task<ExternalResponse> HandleWithPersistenceAsync(
        ExternalRequest request,
        Workflow workflow,
        CancellationToken cancellationToken)
    {
        // 保存请求状态
        await _cache.SetAsync(
            $"request:{request.RequestId}",
            JsonSerializer.SerializeToUtf8Bytes(new
            {
                Request = request,
                Timestamp = DateTime.UtcNow,
                Status = "Pending"
            }),
            new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
            },
            cancellationToken);
        
        // 创建工作流检查点
        var checkpointManager = CheckpointManager.Default;
        await using var checkpointedRun = await InProcessExecution.StreamAsync(
            workflow, 
            request, 
            checkpointManager);
        
        // 等待响应或超时
        var response = await WaitForResponseAsync(request.RequestId, cancellationToken);
        
        // 更新状态
        await _cache.SetAsync(
            $"request:{request.RequestId}",
            JsonSerializer.SerializeToUtf8Bytes(new
            {
                Request = request,
                Response = response,
                Timestamp = DateTime.UtcNow,
                Status = "Completed"
            }),
            cancellationToken: cancellationToken);
        
        return response;
    }
    
    public async Task<ExternalResponse> ResumeFromCheckpointAsync(
        string requestId,
        CancellationToken cancellationToken)
    {
        // 从缓存恢复请求信息
        var requestData = await _cache.GetAsync($"request:{requestId}", cancellationToken);
        var requestInfo = JsonSerializer.Deserialize<RequestInfo>(requestData);
        
        // 从检查点恢复工作流
        var checkpoint = await _checkpointStore.LoadAsync(requestInfo.CheckpointId, cancellationToken);
        // ... 恢复执行
    }
}

九、常见陷阱与解决方案

9.1 陷阱一:忘记处理超时

问题:人工接入请求发出后,如果用户长时间不响应,工作流会永久挂起。

解决方案

private static async Task<ExternalResponse> HandleWithTimeout(
    ExternalRequest request,
    TimeSpan timeout)
{
    using var cts = new CancellationTokenSource(timeout);
    
    var responseTask = GetHumanInputAsync(request, cts.Token);
    var timeoutTask = Task.Delay(timeout, cts.Token);
    
    var completedTask = await Task.WhenAny(responseTask, timeoutTask);
    
    if (completedTask == responseTask)
    {
        return await responseTask;
    }
    else
    {
        // 超时后的降级策略
        _logger.LogWarning($"请求 {request.RequestId} 超时,使用默认策略");
        return request.CreateResponse(GetDefaultDecision(request));
    }
}

9.2 陷阱二:状态不一致

问题:在分布式环境中,工作流状态和人工处理系统的状态可能不同步。

解决方案:使用分布式锁和事务

internal sealed class ConsistentHumanRequestHandler
{
    private readonly IDistributedLockProvider _lockProvider;
    private readonly ITransactionManager _transactionManager;
    
    public async Task<ExternalResponse> HandleConsistentlyAsync(
        ExternalRequest request,
        CancellationToken cancellationToken)
    {
        // 获取分布式锁
        await using var lockHandle = await _lockProvider.AcquireLockAsync(
            $"request:{request.RequestId}",
            TimeSpan.FromMinutes(5),
            cancellationToken);
        
        // 在事务中处理
        await using var transaction = await _transactionManager.BeginTransactionAsync(cancellationToken);
        
        try
        {
            // 1. 保存请求状态
            await SaveRequestStateAsync(request, transaction, cancellationToken);
            
            // 2. 发送通知
            await NotifyHumanAsync(request, cancellationToken);
            
            // 3. 等待响应
            var response = await WaitForResponseAsync(request.RequestId, cancellationToken);
            
            // 4. 更新状态
            await UpdateRequestStateAsync(request.RequestId, response, transaction, cancellationToken);
            
            // 5. 提交事务
            await transaction.CommitAsync(cancellationToken);
            
            return response;
        }
        catch
        {
            await transaction.RollbackAsync(cancellationToken);
            throw;
        }
    }
}

9.3 陷阱三:内存泄漏

问题:长时间运行的工作流中,未清理的请求处理器会导致内存泄漏。

解决方案:使用弱引用和定期清理

internal sealed class LeakFreeRequestManager
{
    private readonly ConcurrentDictionary<string, WeakReference<TaskCompletionSource<ExternalResponse>>> 
        _pendingRequests = new();
    private readonly Timer _cleanupTimer;
    
    public LeakFreeRequestManager()
    {
        // 每分钟清理一次过期的请求
        _cleanupTimer = new Timer(CleanupExpiredRequests, null, 
            TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
    }
    
    public async Task<ExternalResponse> HandleRequestAsync(
        ExternalRequest request,
        TimeSpan timeout,
        CancellationToken cancellationToken)
    {
        var tcs = new TaskCompletionSource<ExternalResponse>();
        _pendingRequests[request.RequestId] = new WeakReference<TaskCompletionSource<ExternalResponse>>(tcs);
        
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        cts.CancelAfter(timeout);
        
        try
        {
            return await tcs.Task.WaitAsync(cts.Token);
        }
        finally
        {
            // 清理已完成的请求
            _pendingRequests.TryRemove(request.RequestId, out _);
        }
    }
    
    private void CleanupExpiredRequests(object? state)
    {
        var expiredKeys = new List<string>();
        
        foreach (var kvp in _pendingRequests)
        {
            if (!kvp.Value.TryGetTarget(out var tcs) || tcs.Task.IsCompleted)
            {
                expiredKeys.Add(kvp.Key);
            }
        }
        
        foreach (var key in expiredKeys)
        {
            _pendingRequests.TryRemove(key, out _);
        }
        
        _logger.LogDebug($"清理了 {expiredKeys.Count} 个过期请求");
    }
}

9.4 陷阱四:类型不匹配

问题:响应类型与请求期望的类型不匹配,导致运行时错误。

解决方案:严格的类型验证

public static class SafeResponseCreator
{
    public static ExternalResponse CreateTypeSafeResponse<TExpected>(
        this ExternalRequest request,
        object responseData)
    {
        // 验证响应类型
        if (!request.PortInfo.ResponseType.IsMatchPolymorphic(responseData.GetType()))
        {
            throw new InvalidOperationException(
                $"响应类型 {responseData.GetType().Name} 与期望类型 " +
                $"{request.PortInfo.ResponseType.TypeName} 不匹配");
        }
        
        // 尝试转换
        if (responseData is not TExpected)
        {
            try
            {
                responseData = Convert.ChangeType(responseData, typeof(TExpected));
            }
            catch (Exception ex)
            {
                throw new InvalidCastException(
                    $"无法将 {responseData.GetType().Name} 转换为 {typeof(TExpected).Name}", 
                    ex);
            }
        }
        
        return request.CreateResponse(responseData);
    }
}

// 使用示例
var response = request.CreateTypeSafeResponse<ApprovalDecision>(userInput);

十、未来展望与演进方向

10.1 AI 辅助的人工接入

未来的人工接入不应该是"非黑即白"的切换,而是 AI 和人类的协作:

// AI 提供建议,人类做最终决策
internal sealed class AIAssistedHumanDecision
{
    public async Task<ExternalResponse> GetAssistedDecisionAsync(
        ExternalRequest request,
        CancellationToken cancellationToken)
    {
        if (request.DataIs<ApprovalRequest>(out var approvalReq))
        {
            // AI 分析并提供建议
            var aiAnalysis = await _aiAgent.AnalyzeAsync(approvalReq);
            
            // 展示给人类决策者
            var enrichedRequest = new
            {
                Original = approvalReq,
                AIRecommendation = aiAnalysis.Recommendation,
                Confidence = aiAnalysis.Confidence,
                RiskFactors = aiAnalysis.RiskFactors,
                SimilarCases = await FindSimilarCases(approvalReq),
                PredictedOutcome = aiAnalysis.PredictedOutcome
            };
            
            // 人类基于 AI 建议做决策
            var humanDecision = await GetHumanDecisionWithAIContext(enrichedRequest);
            
            // 记录决策用于 AI 学习
            await RecordDecisionForLearning(approvalReq, aiAnalysis, humanDecision);
            
            return request.CreateResponse(humanDecision);
        }
        
        throw new NotSupportedException();
    }
}

10.2 自适应的人工接入阈值

系统应该能够学习何时需要人工接入:

internal sealed class AdaptiveHumanInterventionThreshold
{
    private readonly IMLModel _thresholdModel;
    
    public async Task<bool> ShouldRequestHumanAsync(
        WorkflowContext context,
        AIDecision aiDecision)
    {
        // 收集特征
        var features = new
        {
            AIConfidence = aiDecision.Confidence,
            DecisionComplexity = CalculateComplexity(context),
            HistoricalAccuracy = await GetHistoricalAccuracy(context.Category),
            BusinessImpact = CalculateBusinessImpact(context),
            UserSatisfactionTrend = await GetSatisfactionTrend(context.Category)
        };
        
        // ML 模型预测是否需要人工
        var prediction = await _thresholdModel.PredictAsync(features);
        
        return prediction.ShouldRequestHuman;
    }
    
    public async Task LearnFromOutcomeAsync(
        WorkflowContext context,
        AIDecision aiDecision,
        bool requestedHuman,
        Outcome actualOutcome)
    {
        // 记录训练数据
        await _thresholdModel.RecordTrainingDataAsync(new
        {
            Context = context,
            AIDecision = aiDecision,
            RequestedHuman = requestedHuman,
            ActualOutcome = actualOutcome,
            WasCorrect = aiDecision.Prediction == actualOutcome
        });
    }
}

10.3 多模态人工接入

未来的人工接入不仅限于文本,还可能包括语音、图像、视频:

internal sealed class MultiModalHumanInterface
{
    public async Task<ExternalResponse> RequestMultiModalInputAsync(
        ExternalRequest request,
        CancellationToken cancellationToken)
    {
        var modalityPreference = await GetUserModalityPreference();
        
        return modalityPreference switch
        {
            InputModality.Voice => await RequestVoiceInputAsync(request, cancellationToken),
            InputModality.Touch => await RequestTouchInputAsync(request, cancellationToken),
            InputModality.Gesture => await RequestGestureInputAsync(request, cancellationToken),
            InputModality.BrainInterface => await RequestBCIInputAsync(request, cancellationToken),
            _ => await RequestTextInputAsync(request, cancellationToken)
        };
    }
    
    private async Task<ExternalResponse> RequestVoiceInputAsync(
        ExternalRequest request,
        CancellationToken cancellationToken)
    {
        // 通过语音接口获取输入
        var audioStream = await _voiceInterface.StartListeningAsync();
        var transcription = await _speechToText.TranscribeAsync(audioStream);
        var intent = await _nluEngine.ParseAsync(transcription);
        
        return request.CreateResponse(intent);
    }
}

10.4 区块链验证的人工决策

对于高风险决策,可以使用区块链记录不可篡改的审计日志:

internal sealed class BlockchainVerifiedHumanDecision
{
    private readonly IBlockchainClient _blockchain;
    
    public async Task<ExternalResponse> GetVerifiedDecisionAsync(
        ExternalRequest request,
        CancellationToken cancellationToken)
    {
        var decision = await GetHumanDecisionAsync(request, cancellationToken);
        
        // 创建决策记录
        var record = new DecisionRecord
        {
            RequestId = request.RequestId,
            Timestamp = DateTime.UtcNow,
            DecisionMaker = GetCurrentUserId(),
            Decision = decision,
            Context = request.Data,
            Signature = await SignDecisionAsync(decision)
        };
        
        // 写入区块链
        var transactionHash = await _blockchain.RecordDecisionAsync(record);
        
        // 在响应中包含区块链证明
        var verifiedResponse = new VerifiedDecision
        {
            Decision = decision,
            BlockchainProof = new
            {
                TransactionHash = transactionHash,
                BlockNumber = await _blockchain.GetCurrentBlockNumberAsync(),
                Timestamp = record.Timestamp
            }
        };
        
        return request.CreateResponse(verifiedResponse);
    }
}

十一、总结与思考

11.1 核心要点回顾

通过深入剖析 Microsoft Agent Framework 的人工接入机制,我们发现了几个关键设计原则:

1. 类型安全优先

  • RequestPort 通过泛型实现编译期类型检查

  • PortableValue 提供跨边界的类型安全传递

  • 强类型契约减少运行时错误

2. 状态可恢复性

  • SuperStep 作为原子执行单位

  • Checkpoint 机制保证任意时刻可恢复

  • DurableProperty 自动管理持久化状态

3. 灵活性与易用性的平衡

  • RequestPort 提供最大灵活性(编程式)

  • Question Action 提供最佳易用性(声明式)

  • 两种模式互补,覆盖不同场景

4. 可观测性内置

  • Activity 追踪提供完整的执行链路

  • 事件流提供实时的状态反馈

  • 指标收集支持性能分析

11.2 设计哲学的启示

Microsoft Agent Framework 的人工接入设计给我们带来了几点启示:

启示一:抽象的力量

把人类抽象成"特殊的执行器",这个看似简单的设计决策,带来了巨大的好处:

  • 统一的编程模型

  • 一致的状态管理

  • 无缝的 Checkpoint 支持

启示二:边界的清晰

工作流引擎和外部世界之间有明确的边界:

  • RequestInfoEvent 是唯一的出口

  • ExternalResponse 是唯一的入口

  • 这种清晰的边界使得系统易于理解和测试

启示三:容错的必要性

人工接入天然是不可靠的:

  • 人可能不在线

  • 人可能输入错误

  • 人可能需要很长时间

系统必须为这些情况做好准备:超时、重试、默认值、降级策略。

启示四:演进的空间

好的设计应该为未来留有空间:

  • RequestPort 的泛型设计支持任意类型

  • PortableValue 的抽象支持跨进程传递

  • 事件驱动的架构支持异步和分布式

11.3 实践建议

基于本文的分析,给出几点实践建议:

对于架构师:

  1. 优先考虑 RequestPort 用于核心业务流程

  2. 为人工接入设计明确的 SLA(响应时间、可用性)

  3. 建立完善的监控和告警机制

  4. 规划好状态持久化和灾难恢复策略

对于开发者:

  1. 充分利用类型系统,避免运行时错误

  2. 为每个人工接入点设计超时和降级策略

  3. 编写充分的单元测试和集成测试

  4. 使用 Checkpoint 机制保证可恢复性

对于业务人员:

  1. Question Action 足以应对大多数标准流程

  2. 合理设置重试次数和默认值

  3. 利用实体提取简化用户输入

  4. 定期审查和优化人工接入的触发条件

11.4 最后的思考

人工接入不是 AI 的失败,而是 AI 的谦逊。

在这个 AI 能力日益强大的时代,我们很容易陷入"AI 万能论"的陷阱。但真正成熟的 AI 系统,应该知道自己的边界,知道何时需要人类的智慧。

Microsoft Agent Framework 的人工接入机制,正是这种谦逊的体现。它不是简单地把人类排除在外,也不是把人类当作 AI 的"备胎",而是把人类视为工作流中平等的参与者——有时是决策者,有时是监督者,有时是创造者。

这种设计哲学,或许才是 AI 与人类协作的正确方向。


参考资源


更多AIGC文章

RAG技术全解:从原理到实战的简明指南

更多VibeCoding文章

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐