1. Grains

定义与核心思想

Grains 是 Orleans 编程模型的基石。它们是:

“...the building blocks of an Orleans application. They are the units of computation and state that make up your application.”

  • 虚拟参与者(Virtual Actors):Grains 是对经典 Actor 模型的扩展,被称为“虚拟参与者”。其关键特性是生命周期由运行时自动管理
  • 自动激活/停用:开发者无需显式创建或销毁 Grain。当对一个 Grain 发起调用时,如果它尚未激活,Orleans 会自动在集群中的某个 Silo 上创建其实例;当一段时间无活动后,运行时会自动将其停用以释放资源。
  • 单线程执行:每个 Grain 实例一次只处理一个请求,这极大地简化了并发编程,开发者无需担心多线程同步问题。
实现方式
  • 接口与类分离:一个 Grain 由一个接口(继承自 IGrain* 系列接口)和一个实现类(继承自 Grain 基类)组成。
    public interface IPlayerGrain : IGrainWithGuidKey { ... }
    public class PlayerGrain : Grain, IPlayerGrain { ... }
  • 异步方法:所有 Grain 方法必须返回 TaskTask<T> 或 ValueTask<T>,以支持非阻塞的异步调用。

2. Grain References

定义与作用

Grain 引用是:

“...a proxy object that implements the same grain interface as the corresponding grain class. It encapsulates the logical identity (type and unique key) of the target grain.”

  • 代理对象:Grain 引用是一个客户端代理,它实现了 Grain 接口,但本身不包含业务逻辑。
  • 逻辑标识的载体:它封装了目标 Grain 的完整逻辑标识(类型 + 唯一键)。
  • 位置透明:Grain 引用独立于 Grain 的物理位置。即使 Grain 被停用、迁移或整个集群重启,引用依然有效。
获取方式

通过 IGrainFactory(在 Grain 内部)或 IClusterClient(在客户端)获取:

// Inside a grain
IPlayerGrain player = GrainFactory.GetGrain<IPlayerGrain>(playerId);

// From an Orleans client
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
歧义解析

当一个 Grain 接口有多个实现类时,GetGrain 调用会失败。文档提供了多种解决方案:

  • 为每个实现定义唯一的接口。
  • 使用 grainClassNamePrefix 参数。
  • 使用 [DefaultGrainType] 属性指定默认实现。
  • 直接使用 GrainId 对象来明确指定类型和键。

3. Grain Identity

组成

每个 Grain 都有一个唯一的、用户定义的标识符,由两部分组成:

“The grain type name, uniquely identifying the grain class. The grain key, uniquely identifying a logical instance of that grain class.”

  • Grain Type Name:标识 Grain 类。默认规则是移除类名末尾的 "Grain" 后缀并转为小写(如 ShoppingCartGrain -> shoppingcart)。可通过 [GrainType("custom-name")] 自定义。
  • Grain Key:标识该类型下的一个逻辑实例。可以是 Guidlongstring 或复合键(Guid + string / long + string)。

完整的 Grain 标识通常写作 type-name/key(例如 shoppingcart/bob65)。

主键选择
  • Guid:适用于需要全局唯一性和随机分配的场景(如新任务ID)。
  • long:适用于与关系型数据库集成的场景,因为数值索引通常更高效。
  • string:适用于具有自然字符串标识符的实体(如用户名)。
  • 复合键:当单一类型无法满足需求时使用。

4. Grain Placement

定义

Grain 放置(Placement)是指 Orleans 决定将新激活的 Grain 实例放置在集群中哪个 Silo 上的过程。

“...the process by which Orleans selects a server to activate the grain on.”

这是 Orleans 实现负载均衡和优化资源利用的关键机制。

内置放置策略

Orleans 提供了多种开箱即用的放置策略,通过在 Grain 类上添加特性来配置:

  • [RandomPlacement]:从兼容的 Silo 中随机选择一个(默认策略)。
  • [PreferLocalPlacement]:优先选择本地 Silo(发起调用的 Silo),否则随机选择。
  • [HashBasedPlacement]:基于 Grain ID 的哈希值选择 Silo,提供确定性放置(但集群拓扑变化时可能不稳定)。
  • [ActivationCountBasedPlacement]:基于“两个选择的力量”(Power of Two Choices)算法,选择负载(激活数)最低的 Silo。
  • [StatelessWorker]:一种特殊的放置策略,用于无状态 Worker Grain,允许多个激活实例存在。
  • [SiloRoleBasedPlacement]:将 Grain 放置在具有特定角色标签的 Silo 上。
  • [ResourceOptimizedPlacement]:基于 CPU 和内存等资源使用情况,将 Grain 放置在资源最充裕的 Silo 上。
自定义放置

开发者可以通过实现 IPlacementDirector 接口来创建完全自定义的放置逻辑。


5. Grain Extensions

定义与目的

Grain 扩展提供了一种机制,用于:

“...add extra behavior to grains.”

它允许在不修改 Grain 核心实现的情况下,向 Grain 动态地添加新的方法和功能。

工作原理
  1. 定义扩展接口:创建一个继承自 IGrainExtension 的接口。
    public interface IGrainDeactivateExtension : IGrainExtension
    {
        Task Deactivate(string msg);
    }
  2. 实现扩展:创建一个实现该接口的类,并通过构造函数注入 IGrainContext 以访问目标 Grain。
    public sealed class GrainDeactivateExtension : IGrainDeactivateExtension
    {
        private IGrainContext _context;
        public GrainDeactivateExtension(IGrainContext context) => _context = context;
        public Task Deactivate(string msg) { ... }
    }
  3. 注册扩展:在 Silo 配置中注册该扩展。
    siloBuilder.AddGrainExtension<IGrainDeactivateExtension, GrainDeactivateExtension>();
  4. 使用扩展:通过 AsReference<T>() 方法从 Grain 引用获取扩展接口的代理,并调用其方法。
    var extension = grain.AsReference<IGrainDeactivateExtension>();
    await extension.Deactivate("reason");
典型应用场景
  • 动态添加通用功能:如为所有 Grain 添加一个 Deactivate 方法。
  • 状态访问器:为外部代码提供一种安全的方式来读取或修改 Grain 的内部状态(通过 Func/Action 委托)。

6. Timers and Reminders

Orleans 提供了两种机制来实现 Grain 的周期性行为:TimersReminders

Timers
  • 作用:用于创建不跨越多个激活实例的周期性行为。其语义与 System.Threading.Timer 相同。
  • 生命周期:与 Grain 激活实例绑定。当 Grain 停用或 Silo 崩溃时,Timer 会停止。
  • 关键特性
    • 单线程执行:在 Grain 的单线程上下文中执行,不会与其他调用并发。
    • 不阻止停用:执行 Timer 回调不会将激活状态从“空闲”变为“使用中”,因此不能用于推迟停用。
    • 精确周期:下一次回调的触发时间是从上一次回调的 Task 完成后开始计算的,这可以防止回调重叠。
    • 可选保持激活:通过 GrainTimerCreationOptions.KeepAlive = true 可以让短周期的 Timer 阻止 Grain 被停用。
  • 使用场景:适用于不需要在故障或停用后继续执行的、高频率(秒/分钟级)的任务。
Reminders
  • 作用:与 Timer 类似,但具有持久性
  • 生命周期:与 Grain 逻辑身份绑定,而非特定激活实例。即使集群完全重启,只要未显式取消,Reminder 就会继续触发。
  • 关键特性
    • 持久化:Reminder 的定义存储在外部存储(如 Azure Table, SQL)中。
    • 自动激活:如果 Grain 未激活,Reminder 触发时会自动激活它。
    • 低频率:设计用于分钟、小时或天级别的任务,不适用于高频率定时器
    • 必须显式取消:通过 UnregisterReminder 方法取消,不能通过 Dispose
  • 使用场景:适用于需要在故障和停用后依然可靠执行的、低频率的任务。
// Grain Interface
public interface IExampleGrain : IGrainWithGuidKey
{
    Task StartTimer();
    Task StartReminder();
}

// Grain Implementation
public class ExampleGrain : Grain, IExampleGrain, IRemindable
{
    private IGrainTimer? _timer;
    private IGrainReminder? _reminder;

    public async Task StartTimer()
    {
        // 注册一个每10秒触发一次的 Timer
        _timer = RegisterGrainTimer(
            callback: async (state, ct) =>
            {
                Console.WriteLine("Timer ticked!");
                await Task.CompletedTask;
            },
            state: null,
            options: new GrainTimerCreationOptions
            {
                DueTime = TimeSpan.Zero,
                Period = TimeSpan.FromSeconds(10),
                KeepAlive = true // 防止因空闲而被停用
            });
    }

    public async Task StartReminder()
    {
        // 注册一个每5分钟触发一次的 Reminder
        _reminder = await RegisterOrUpdateReminder(
            reminderName: "MyReminder",
            dueTime: TimeSpan.Zero,
            period: TimeSpan.FromMinutes(5));
    }

    // IRemindable 接口实现
    public Task ReceiveReminder(string reminderName, TickStatus status)
    {
        Console.WriteLine($"Reminder '{reminderName}' received at {status.CurrentTickTime}");
        return Task.CompletedTask;
    }

    public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken ct)
    {
        // 清理资源
        _timer?.Dispose();
        if (_reminder != null)
        {
            await UnregisterReminder(_reminder);
        }
        await base.OnDeactivateAsync(reason, ct);
    }
}

注意: 使用 Reminder 需在 Silo 配置中添加存储提供程序,例如 builder.UseInMemoryReminderService(); (仅用于开发)。


7. Observers

Observers 是一种允许 Grain 异步通知客户端或其他 Grain 的机制。

  • 接口要求:Observer 接口必须继承自 IGrainObserver,其方法返回 voidTask 或 ValueTask不推荐返回 void,因为这可能导致 async void 的危险模式。
  • 订阅流程
    1. 客户端创建一个实现 Observer 接口的本地对象。
    2. 使用 IGrainFactory.CreateObjectReference<T>() 将该对象转换为一个 Grain 引用。
    3. 将此引用传递给 Grain 的订阅方法。
  • 可靠性:Observers 本质上是不可靠的。客户端可能失败且无法恢复,因此 Orleans 提供了 ObserverManager<T> 工具类,它会自动移除长时间未活动的订阅者,客户端需要定期重新订阅以保持活跃。
  • 执行模型:Observer 是非重入的,Orleans 不会对并发请求进行交错处理。AlwaysInterleaveAttribute 等属性对其无效。
  • Grain 作为 Observer:Grain 也可以实现 IGrainObserver 接口,并直接传递 this.AsReference<IObserverInterface>() 进行订阅,无需 CreateObjectReference
// Observer Interface
public interface IGameObserver : IGrainObserver
{
    Task OnScoreUpdate(int score);
}

// Client-side Observer Implementation
public class GameClientObserver : IGameObserver
{
    public Task OnScoreUpdate(int score)
    {
        Console.WriteLine($"New score: {score}");
        return Task.CompletedTask;
    }
}

// Grain Interface
public interface IGameGrain : IGrainWithGuidKey
{
    Task Subscribe(IGameObserver observer);
    Task Unsubscribe(IGameObserver observer);
    Task UpdateScore(int newScore);
}

// Grain Implementation
public class GameGrain : Grain, IGameGrain
{
    private readonly ObserverManager<IGameObserver> _observers;

    public GameGrain() 
    {
        // 自动移除5分钟内未活动的观察者
        _observers = new ObserverManager<IGameObserver>(TimeSpan.FromMinutes(5));
    }

    public Task Subscribe(IGameObserver observer)
    {
        _observers.Subscribe(observer, observer);
        return Task.CompletedTask;
    }

    public Task Unsubscribe(IGameObserver observer)
    {
        _observers.Unsubscribe(observer);
        return Task.CompletedTask;
    }

    public Task UpdateScore(int newScore)
    {
        // 通知所有订阅者
        _observers.Notify(o => o.OnScoreUpdate(newScore));
        return Task.CompletedTask;
    }
}

// Client Usage
var gameGrain = client.GetGrain<IGameGrain>(Guid.NewGuid());
var observer = new GameClientObserver();
var observerRef = await client.CreateObjectReference<IGameObserver>(observer);

await gameGrain.Subscribe(observerRef);
await gameGrain.UpdateScore(100); // Client will print "New score: 100"

8. Use Cancellation Tokens in Orleans Grains

Orleans 支持通过标准的 CancellationToken 实现协作式取消

  • 目的:允许提前停止长时间运行的操作,提高应用的响应性和资源利用率。
  • 支持范围:适用于所有 Grain 方法(包括流式 IAsyncEnumerable<T> 方法)以及客户端到 Grain、Grain 到 Grain 的调用。
  • 协作式:取消是协作式的,意味着 Grain 实现必须主动检查 CancellationToken 并做出响应(例如调用 ThrowIfCancellationRequested())。
  • 向后兼容:可以在现有 Grain 接口中添加 CancellationToken 参数(建议设为可选默认值),旧客户端调用时会收到 CancellationToken.None,不会破坏兼容性。
  • 超时集成:通过配置 SiloMessagingOptions.CancelRequestOnTimeout = true(默认为 true),当请求超时时,Orleans 会自动向目标 Grain 发送取消信号。
  • 流式取消:对于 IAsyncEnumerable<T>,可以在方法参数上使用 [EnumeratorCancellation] 属性,并在 await foreach 循环中传递 Token。
// Grain Interface
public interface IProcessingGrain : IGrainWithGuidKey
{
    Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default);
}

// Grain Implementation
public class ProcessingGrain : Grain, IProcessingGrain
{
    public async Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default)
    {
        var results = new List<string>();
        for (int i = 0; i < chunks; i++)
        {
            cancellationToken.ThrowIfCancellationRequested(); // 检查取消
            
            // 模拟处理工作
            await Task.Delay(100, cancellationToken); // 使用带 Token 的异步方法
            results.Add($"{data}_part_{i}");
        }
        return string.Join(", ", results);
    }
}

// Client Usage
var grain = client.GetGrain<IProcessingGrain>(Guid.NewGuid());
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); // 2秒超时

try
{
    var result = await grain.ProcessDataAsync("test", 100, cts.Token);
    Console.WriteLine(result);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Operation was canceled.");
}

9. Request Scheduling

Orleans 默认采用单线程、非重入的执行模型,但提供了多种方式来控制请求的调度和交错(Interleaving)。

  • 默认行为(非重入):Grain 一次只处理一个请求,直到完成才处理下一个。这简化了编程,但可能导致死锁(如 A 调 B,B 同时调 A)或性能瓶颈(如等待数据库 I/O 时无法处理其他请求)。
  • 启用重入(Interleaving)
    • [Reentrant] 特性:标记整个 Grain 类,允许其不同请求的执行交错。
    • [AlwaysInterleave] 特性:标记特定方法,使其总是可以与其他任何请求交错。
    • [ReadOnly] 特性:标记不修改 Grain 状态的方法,允许多个此类请求并发执行。
    • MayInterleave 特性:提供基于请求内容的动态交错决策。
  • 调用链重入:使用 RequestContext.AllowCallChainReentrancy() 可以在特定的调用链中临时启用重入,以解决循环依赖导致的死锁问题。
// Non-reentrant (Default) - 可能死锁
public interface IPingGrain : IGrainWithStringKey
{
    Task CallOther(IPingGrain other);
}

public class PingGrain : Grain, IPingGrain
{
    public async Task CallOther(IPingGrain other)
    {
        await other.Ping(); // 如果两个 Grain 互相调用,可能死锁
    }
    
    public Task Ping() => Task.CompletedTask;
}

// Reentrant - 解决死锁
[Reentrant]
public class SafePingGrain : Grain, IPingGrain
{
    public async Task CallOther(IPingGrain other)
    {
        await other.Ping(); // 现在可以安全地交错执行
    }
    
    public Task Ping() => Task.CompletedTask;
}

// AlwaysInterleave - 特定方法总是交错
public interface ISlowpokeGrain : IGrainWithIntegerKey
{
    Task GoSlow();
    [AlwaysInterleave]
    Task GoFast();
}

public class SlowpokeGrain : Grain, ISlowpokeGrain
{
    public async Task GoSlow() => await Task.Delay(TimeSpan.FromSeconds(10));
    public async Task GoFast() => await Task.Delay(TimeSpan.FromSeconds(10));
}

// Client: GoFast calls will interleave and finish in ~10s, not 30s.
var slowpoke = client.GetGrain<ISlowpokeGrain>(0);
await Task.WhenAll(slowpoke.GoFast(), slowpoke.GoFast(), slowpoke.GoFast());

10. Request Context

Request Context 是一个 Orleans 功能,用于在请求调用链中传递应用元数据(如跟踪 ID)。

  • API:通过静态类 RequestContext 的 Set(string key, object value) 和 Get(string key) 方法操作。
  • 工作原理:元数据在每次 Orleans 请求时被序列化并随消息一起发送。接收方 Grain 可以访问这些元数据。
  • 传播范围:元数据会沿着调用链(Grain A -> Grain B -> Grain C)自动传播,并且在 Task.StartNew 或 ContinueWith 创建的延续任务中也会被复制。
  • 限制:元数据不会随响应反向流动。响应处理代码仍在原始请求的上下文中执行。
  • 最佳实践:建议使用简单、可序列化的类型(如字符串、GUID、数字)作为元数据值,以避免序列化开销。
// Client
RequestContext.Set("TraceId", Guid.NewGuid().ToString());
var grain = client.GetGrain<IHelloGrain>("user1");
await grain.SayHello("World");

// Grain
public interface IHelloGrain : IGrainWithStringKey
{
    Task SayHello(string greeting);
}

public class HelloGrain : Grain, IHelloGrain
{
    public Task SayHello(string greeting)
    {
        var traceId = RequestContext.Get("TraceId")?.ToString() ?? "N/A";
        Console.WriteLine($"[{traceId}] Hello, {greeting}!");
        return Task.CompletedTask;
    }
}

11. Orleans Code Generation

代码生成是 Orleans 在构建时自动创建必要源代码的过程,用于处理序列化、方法分发等内部细节。

  • Orleans 7.0+ 的变化:代码生成现在是完全自动的,通常不需要开发者干预。
  • 核心包
    • 客户端项目应引用 Microsoft.Orleans.Client
    • Silo(服务器)项目应引用 Microsoft.Orleans.Server
    • 其他共享库应引用 Microsoft.Orleans.Sdk
  • 生成内容:Orleans 会自动为以下内容生成代码:
    • Grain 接口和实现类
    • Grain 状态类
    • 作为 Grain 方法参数或返回值的类型
  • 手动引导生成
    • 对于无法自动检测的类型,可以使用 [GenerateSerializer] 特性。
    • 对于来自外部程序集的类型,可以使用 [assembly: KnownAssembly("OtherAssembly")]
  • 构建时 vs 运行时:Orleans 7+ 仅在构建时进行代码生成,不再有运行时代码生成阶段,这提高了启动性能和确定性。
// 1. 定义 Grain 接口和类 (无需额外特性)
public interface IUserGrain : IGrainWithGuidKey
{
    Task<UserProfile> GetProfile();
    Task SetProfile(UserProfile profile);
}

public class UserGrain : Grain, IUserGrain
{
    private UserProfile _profile = new();

    public Task<UserProfile> GetProfile() => Task.FromResult(_profile);
    public Task SetProfile(UserProfile profile)
    {
        _profile = profile;
        return Task.CompletedTask;
    }
}

// 2. 定义需要序列化的自定义类型
// Orleans 会自动为其生成序列化器
public record UserProfile(string Name, int Age);

// 3. 项目文件 (无需手动配置,但需引用正确包)
// <PackageReference Include="Microsoft.Orleans.Server" Version="7.2.4" />
// 构建时,Orleans 会自动生成必要的代码。

关键点: 开发者通常只需定义好类型,Orleans 的 MSBuild 任务会在编译时自动处理一切。对于复杂类型或来自外部程序集的类型,才需要使用 [GenerateSerializer][KnownAssembly] 等特性进行引导。


总结

Orleans 的核心模型围绕 Grain 构建,它是一个由运行时自动管理生命周期的虚拟参与者。Grain Reference 作为其代理,提供了位置透明的调用方式。每个 Grain 通过其唯一的 Grain Identity(类型+键)被识别。Grain Placement 策略决定了 Grain 在集群中的物理分布,以实现负载均衡和性能优化。最后,Grain Extensions 提供了一种灵活的机制,可以在不侵入核心逻辑的情况下为 Grain 添加额外行为。这些组件共同构成了 Orleans 强大而简洁的分布式应用开发模型。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐