第2章.Orleans Grain
摘要:Orleans编程模型以Grains为核心构建块,作为虚拟参与者实现分布式计算。Grains通过接口与类分离定义,支持异步单线程执行,由运行时自动管理生命周期。Grain引用作为代理对象封装逻辑标识,实现位置透明调用。每个Grain拥有唯一的GrainIdentity(类型+键),通过多种放置策略(如随机、本地优先等)在集群中分布。GrainExtensions机制支持动态添加功能而不修改核
1. Grains
定义与核心思想
Grains 是 Orleans 编程模型的基石。它们是:
“...the building blocks of an Orleans application. They are the units of computation and state that make up your application.”
- 虚拟参与者(Virtual Actors):Grains 是对经典 Actor 模型的扩展,被称为“虚拟参与者”。其关键特性是生命周期由运行时自动管理。
- 自动激活/停用:开发者无需显式创建或销毁 Grain。当对一个 Grain 发起调用时,如果它尚未激活,Orleans 会自动在集群中的某个 Silo 上创建其实例;当一段时间无活动后,运行时会自动将其停用以释放资源。
- 单线程执行:每个 Grain 实例一次只处理一个请求,这极大地简化了并发编程,开发者无需担心多线程同步问题。
实现方式
- 接口与类分离:一个 Grain 由一个接口(继承自
IGrain*系列接口)和一个实现类(继承自Grain基类)组成。public interface IPlayerGrain : IGrainWithGuidKey { ... } public class PlayerGrain : Grain, IPlayerGrain { ... } - 异步方法:所有 Grain 方法必须返回
Task、Task<T>或ValueTask<T>,以支持非阻塞的异步调用。
2. Grain References
定义与作用
Grain 引用是:
“...a proxy object that implements the same grain interface as the corresponding grain class. It encapsulates the logical identity (type and unique key) of the target grain.”
- 代理对象:Grain 引用是一个客户端代理,它实现了 Grain 接口,但本身不包含业务逻辑。
- 逻辑标识的载体:它封装了目标 Grain 的完整逻辑标识(类型 + 唯一键)。
- 位置透明:Grain 引用独立于 Grain 的物理位置。即使 Grain 被停用、迁移或整个集群重启,引用依然有效。
获取方式
通过 IGrainFactory(在 Grain 内部)或 IClusterClient(在客户端)获取:
// Inside a grain
IPlayerGrain player = GrainFactory.GetGrain<IPlayerGrain>(playerId);
// From an Orleans client
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
歧义解析
当一个 Grain 接口有多个实现类时,GetGrain 调用会失败。文档提供了多种解决方案:
- 为每个实现定义唯一的接口。
- 使用
grainClassNamePrefix参数。 - 使用
[DefaultGrainType]属性指定默认实现。 - 直接使用
GrainId对象来明确指定类型和键。
3. Grain Identity
组成
每个 Grain 都有一个唯一的、用户定义的标识符,由两部分组成:
“The grain type name, uniquely identifying the grain class. The grain key, uniquely identifying a logical instance of that grain class.”
- Grain Type Name:标识 Grain 类。默认规则是移除类名末尾的 "Grain" 后缀并转为小写(如
ShoppingCartGrain->shoppingcart)。可通过[GrainType("custom-name")]自定义。 - Grain Key:标识该类型下的一个逻辑实例。可以是
Guid、long、string或复合键(Guid + string/long + string)。
完整的 Grain 标识通常写作 type-name/key(例如 shoppingcart/bob65)。
主键选择
Guid:适用于需要全局唯一性和随机分配的场景(如新任务ID)。long:适用于与关系型数据库集成的场景,因为数值索引通常更高效。string:适用于具有自然字符串标识符的实体(如用户名)。- 复合键:当单一类型无法满足需求时使用。
4. Grain Placement
定义
Grain 放置(Placement)是指 Orleans 决定将新激活的 Grain 实例放置在集群中哪个 Silo 上的过程。
“...the process by which Orleans selects a server to activate the grain on.”
这是 Orleans 实现负载均衡和优化资源利用的关键机制。
内置放置策略
Orleans 提供了多种开箱即用的放置策略,通过在 Grain 类上添加特性来配置:
[RandomPlacement]:从兼容的 Silo 中随机选择一个(默认策略)。[PreferLocalPlacement]:优先选择本地 Silo(发起调用的 Silo),否则随机选择。[HashBasedPlacement]:基于 Grain ID 的哈希值选择 Silo,提供确定性放置(但集群拓扑变化时可能不稳定)。[ActivationCountBasedPlacement]:基于“两个选择的力量”(Power of Two Choices)算法,选择负载(激活数)最低的 Silo。[StatelessWorker]:一种特殊的放置策略,用于无状态 Worker Grain,允许多个激活实例存在。[SiloRoleBasedPlacement]:将 Grain 放置在具有特定角色标签的 Silo 上。[ResourceOptimizedPlacement]:基于 CPU 和内存等资源使用情况,将 Grain 放置在资源最充裕的 Silo 上。
自定义放置
开发者可以通过实现 IPlacementDirector 接口来创建完全自定义的放置逻辑。
5. Grain Extensions
定义与目的
Grain 扩展提供了一种机制,用于:
“...add extra behavior to grains.”
它允许在不修改 Grain 核心实现的情况下,向 Grain 动态地添加新的方法和功能。
工作原理
- 定义扩展接口:创建一个继承自
IGrainExtension的接口。public interface IGrainDeactivateExtension : IGrainExtension { Task Deactivate(string msg); } - 实现扩展:创建一个实现该接口的类,并通过构造函数注入
IGrainContext以访问目标 Grain。public sealed class GrainDeactivateExtension : IGrainDeactivateExtension { private IGrainContext _context; public GrainDeactivateExtension(IGrainContext context) => _context = context; public Task Deactivate(string msg) { ... } } - 注册扩展:在 Silo 配置中注册该扩展。
siloBuilder.AddGrainExtension<IGrainDeactivateExtension, GrainDeactivateExtension>(); - 使用扩展:通过
AsReference<T>()方法从 Grain 引用获取扩展接口的代理,并调用其方法。var extension = grain.AsReference<IGrainDeactivateExtension>(); await extension.Deactivate("reason");
典型应用场景
- 动态添加通用功能:如为所有 Grain 添加一个
Deactivate方法。 - 状态访问器:为外部代码提供一种安全的方式来读取或修改 Grain 的内部状态(通过
Func/Action委托)。
6. Timers and Reminders
Orleans 提供了两种机制来实现 Grain 的周期性行为:Timers 和 Reminders。
Timers
- 作用:用于创建不跨越多个激活实例的周期性行为。其语义与
System.Threading.Timer相同。 - 生命周期:与 Grain 激活实例绑定。当 Grain 停用或 Silo 崩溃时,Timer 会停止。
- 关键特性:
- 单线程执行:在 Grain 的单线程上下文中执行,不会与其他调用并发。
- 不阻止停用:执行 Timer 回调不会将激活状态从“空闲”变为“使用中”,因此不能用于推迟停用。
- 精确周期:下一次回调的触发时间是从上一次回调的
Task完成后开始计算的,这可以防止回调重叠。 - 可选保持激活:通过
GrainTimerCreationOptions.KeepAlive = true可以让短周期的 Timer 阻止 Grain 被停用。
- 使用场景:适用于不需要在故障或停用后继续执行的、高频率(秒/分钟级)的任务。
Reminders
- 作用:与 Timer 类似,但具有持久性。
- 生命周期:与 Grain 逻辑身份绑定,而非特定激活实例。即使集群完全重启,只要未显式取消,Reminder 就会继续触发。
- 关键特性:
- 持久化:Reminder 的定义存储在外部存储(如 Azure Table, SQL)中。
- 自动激活:如果 Grain 未激活,Reminder 触发时会自动激活它。
- 低频率:设计用于分钟、小时或天级别的任务,不适用于高频率定时器。
- 必须显式取消:通过
UnregisterReminder方法取消,不能通过Dispose。
- 使用场景:适用于需要在故障和停用后依然可靠执行的、低频率的任务。
// Grain Interface
public interface IExampleGrain : IGrainWithGuidKey
{
Task StartTimer();
Task StartReminder();
}
// Grain Implementation
public class ExampleGrain : Grain, IExampleGrain, IRemindable
{
private IGrainTimer? _timer;
private IGrainReminder? _reminder;
public async Task StartTimer()
{
// 注册一个每10秒触发一次的 Timer
_timer = RegisterGrainTimer(
callback: async (state, ct) =>
{
Console.WriteLine("Timer ticked!");
await Task.CompletedTask;
},
state: null,
options: new GrainTimerCreationOptions
{
DueTime = TimeSpan.Zero,
Period = TimeSpan.FromSeconds(10),
KeepAlive = true // 防止因空闲而被停用
});
}
public async Task StartReminder()
{
// 注册一个每5分钟触发一次的 Reminder
_reminder = await RegisterOrUpdateReminder(
reminderName: "MyReminder",
dueTime: TimeSpan.Zero,
period: TimeSpan.FromMinutes(5));
}
// IRemindable 接口实现
public Task ReceiveReminder(string reminderName, TickStatus status)
{
Console.WriteLine($"Reminder '{reminderName}' received at {status.CurrentTickTime}");
return Task.CompletedTask;
}
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken ct)
{
// 清理资源
_timer?.Dispose();
if (_reminder != null)
{
await UnregisterReminder(_reminder);
}
await base.OnDeactivateAsync(reason, ct);
}
}
注意: 使用 Reminder 需在 Silo 配置中添加存储提供程序,例如
builder.UseInMemoryReminderService();(仅用于开发)。
7. Observers
Observers 是一种允许 Grain 异步通知客户端或其他 Grain 的机制。
- 接口要求:Observer 接口必须继承自
IGrainObserver,其方法返回void、Task或ValueTask。不推荐返回void,因为这可能导致async void的危险模式。 - 订阅流程:
- 客户端创建一个实现 Observer 接口的本地对象。
- 使用
IGrainFactory.CreateObjectReference<T>()将该对象转换为一个 Grain 引用。 - 将此引用传递给 Grain 的订阅方法。
- 可靠性:Observers 本质上是不可靠的。客户端可能失败且无法恢复,因此 Orleans 提供了
ObserverManager<T>工具类,它会自动移除长时间未活动的订阅者,客户端需要定期重新订阅以保持活跃。 - 执行模型:Observer 是非重入的,Orleans 不会对并发请求进行交错处理。
AlwaysInterleaveAttribute等属性对其无效。 - Grain 作为 Observer:Grain 也可以实现
IGrainObserver接口,并直接传递this.AsReference<IObserverInterface>()进行订阅,无需CreateObjectReference。
// Observer Interface
public interface IGameObserver : IGrainObserver
{
Task OnScoreUpdate(int score);
}
// Client-side Observer Implementation
public class GameClientObserver : IGameObserver
{
public Task OnScoreUpdate(int score)
{
Console.WriteLine($"New score: {score}");
return Task.CompletedTask;
}
}
// Grain Interface
public interface IGameGrain : IGrainWithGuidKey
{
Task Subscribe(IGameObserver observer);
Task Unsubscribe(IGameObserver observer);
Task UpdateScore(int newScore);
}
// Grain Implementation
public class GameGrain : Grain, IGameGrain
{
private readonly ObserverManager<IGameObserver> _observers;
public GameGrain()
{
// 自动移除5分钟内未活动的观察者
_observers = new ObserverManager<IGameObserver>(TimeSpan.FromMinutes(5));
}
public Task Subscribe(IGameObserver observer)
{
_observers.Subscribe(observer, observer);
return Task.CompletedTask;
}
public Task Unsubscribe(IGameObserver observer)
{
_observers.Unsubscribe(observer);
return Task.CompletedTask;
}
public Task UpdateScore(int newScore)
{
// 通知所有订阅者
_observers.Notify(o => o.OnScoreUpdate(newScore));
return Task.CompletedTask;
}
}
// Client Usage
var gameGrain = client.GetGrain<IGameGrain>(Guid.NewGuid());
var observer = new GameClientObserver();
var observerRef = await client.CreateObjectReference<IGameObserver>(observer);
await gameGrain.Subscribe(observerRef);
await gameGrain.UpdateScore(100); // Client will print "New score: 100"
8. Use Cancellation Tokens in Orleans Grains
Orleans 支持通过标准的 CancellationToken 实现协作式取消。
- 目的:允许提前停止长时间运行的操作,提高应用的响应性和资源利用率。
- 支持范围:适用于所有 Grain 方法(包括流式
IAsyncEnumerable<T>方法)以及客户端到 Grain、Grain 到 Grain 的调用。 - 协作式:取消是协作式的,意味着 Grain 实现必须主动检查
CancellationToken并做出响应(例如调用ThrowIfCancellationRequested())。 - 向后兼容:可以在现有 Grain 接口中添加
CancellationToken参数(建议设为可选默认值),旧客户端调用时会收到CancellationToken.None,不会破坏兼容性。 - 超时集成:通过配置
SiloMessagingOptions.CancelRequestOnTimeout = true(默认为true),当请求超时时,Orleans 会自动向目标 Grain 发送取消信号。 - 流式取消:对于
IAsyncEnumerable<T>,可以在方法参数上使用[EnumeratorCancellation]属性,并在await foreach循环中传递 Token。
// Grain Interface
public interface IProcessingGrain : IGrainWithGuidKey
{
Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default);
}
// Grain Implementation
public class ProcessingGrain : Grain, IProcessingGrain
{
public async Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default)
{
var results = new List<string>();
for (int i = 0; i < chunks; i++)
{
cancellationToken.ThrowIfCancellationRequested(); // 检查取消
// 模拟处理工作
await Task.Delay(100, cancellationToken); // 使用带 Token 的异步方法
results.Add($"{data}_part_{i}");
}
return string.Join(", ", results);
}
}
// Client Usage
var grain = client.GetGrain<IProcessingGrain>(Guid.NewGuid());
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); // 2秒超时
try
{
var result = await grain.ProcessDataAsync("test", 100, cts.Token);
Console.WriteLine(result);
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation was canceled.");
}
9. Request Scheduling
Orleans 默认采用单线程、非重入的执行模型,但提供了多种方式来控制请求的调度和交错(Interleaving)。
- 默认行为(非重入):Grain 一次只处理一个请求,直到完成才处理下一个。这简化了编程,但可能导致死锁(如 A 调 B,B 同时调 A)或性能瓶颈(如等待数据库 I/O 时无法处理其他请求)。
- 启用重入(Interleaving):
[Reentrant]特性:标记整个 Grain 类,允许其不同请求的执行交错。[AlwaysInterleave]特性:标记特定方法,使其总是可以与其他任何请求交错。[ReadOnly]特性:标记不修改 Grain 状态的方法,允许多个此类请求并发执行。MayInterleave特性:提供基于请求内容的动态交错决策。
- 调用链重入:使用
RequestContext.AllowCallChainReentrancy()可以在特定的调用链中临时启用重入,以解决循环依赖导致的死锁问题。
// Non-reentrant (Default) - 可能死锁
public interface IPingGrain : IGrainWithStringKey
{
Task CallOther(IPingGrain other);
}
public class PingGrain : Grain, IPingGrain
{
public async Task CallOther(IPingGrain other)
{
await other.Ping(); // 如果两个 Grain 互相调用,可能死锁
}
public Task Ping() => Task.CompletedTask;
}
// Reentrant - 解决死锁
[Reentrant]
public class SafePingGrain : Grain, IPingGrain
{
public async Task CallOther(IPingGrain other)
{
await other.Ping(); // 现在可以安全地交错执行
}
public Task Ping() => Task.CompletedTask;
}
// AlwaysInterleave - 特定方法总是交错
public interface ISlowpokeGrain : IGrainWithIntegerKey
{
Task GoSlow();
[AlwaysInterleave]
Task GoFast();
}
public class SlowpokeGrain : Grain, ISlowpokeGrain
{
public async Task GoSlow() => await Task.Delay(TimeSpan.FromSeconds(10));
public async Task GoFast() => await Task.Delay(TimeSpan.FromSeconds(10));
}
// Client: GoFast calls will interleave and finish in ~10s, not 30s.
var slowpoke = client.GetGrain<ISlowpokeGrain>(0);
await Task.WhenAll(slowpoke.GoFast(), slowpoke.GoFast(), slowpoke.GoFast());
10. Request Context
Request Context 是一个 Orleans 功能,用于在请求调用链中传递应用元数据(如跟踪 ID)。
- API:通过静态类
RequestContext的Set(string key, object value)和Get(string key)方法操作。 - 工作原理:元数据在每次 Orleans 请求时被序列化并随消息一起发送。接收方 Grain 可以访问这些元数据。
- 传播范围:元数据会沿着调用链(Grain A -> Grain B -> Grain C)自动传播,并且在
Task.StartNew或ContinueWith创建的延续任务中也会被复制。 - 限制:元数据不会随响应反向流动。响应处理代码仍在原始请求的上下文中执行。
- 最佳实践:建议使用简单、可序列化的类型(如字符串、GUID、数字)作为元数据值,以避免序列化开销。
// Client
RequestContext.Set("TraceId", Guid.NewGuid().ToString());
var grain = client.GetGrain<IHelloGrain>("user1");
await grain.SayHello("World");
// Grain
public interface IHelloGrain : IGrainWithStringKey
{
Task SayHello(string greeting);
}
public class HelloGrain : Grain, IHelloGrain
{
public Task SayHello(string greeting)
{
var traceId = RequestContext.Get("TraceId")?.ToString() ?? "N/A";
Console.WriteLine($"[{traceId}] Hello, {greeting}!");
return Task.CompletedTask;
}
}
11. Orleans Code Generation
代码生成是 Orleans 在构建时自动创建必要源代码的过程,用于处理序列化、方法分发等内部细节。
- Orleans 7.0+ 的变化:代码生成现在是完全自动的,通常不需要开发者干预。
- 核心包:
- 客户端项目应引用
Microsoft.Orleans.Client。 - Silo(服务器)项目应引用
Microsoft.Orleans.Server。 - 其他共享库应引用
Microsoft.Orleans.Sdk。
- 客户端项目应引用
- 生成内容:Orleans 会自动为以下内容生成代码:
- Grain 接口和实现类
- Grain 状态类
- 作为 Grain 方法参数或返回值的类型
- 手动引导生成:
- 对于无法自动检测的类型,可以使用
[GenerateSerializer]特性。 - 对于来自外部程序集的类型,可以使用
[assembly: KnownAssembly("OtherAssembly")]。
- 对于无法自动检测的类型,可以使用
- 构建时 vs 运行时:Orleans 7+ 仅在构建时进行代码生成,不再有运行时代码生成阶段,这提高了启动性能和确定性。
// 1. 定义 Grain 接口和类 (无需额外特性)
public interface IUserGrain : IGrainWithGuidKey
{
Task<UserProfile> GetProfile();
Task SetProfile(UserProfile profile);
}
public class UserGrain : Grain, IUserGrain
{
private UserProfile _profile = new();
public Task<UserProfile> GetProfile() => Task.FromResult(_profile);
public Task SetProfile(UserProfile profile)
{
_profile = profile;
return Task.CompletedTask;
}
}
// 2. 定义需要序列化的自定义类型
// Orleans 会自动为其生成序列化器
public record UserProfile(string Name, int Age);
// 3. 项目文件 (无需手动配置,但需引用正确包)
// <PackageReference Include="Microsoft.Orleans.Server" Version="7.2.4" />
// 构建时,Orleans 会自动生成必要的代码。
关键点: 开发者通常只需定义好类型,Orleans 的 MSBuild 任务会在编译时自动处理一切。对于复杂类型或来自外部程序集的类型,才需要使用
[GenerateSerializer]或[KnownAssembly]等特性进行引导。
总结
Orleans 的核心模型围绕 Grain 构建,它是一个由运行时自动管理生命周期的虚拟参与者。Grain Reference 作为其代理,提供了位置透明的调用方式。每个 Grain 通过其唯一的 Grain Identity(类型+键)被识别。Grain Placement 策略决定了 Grain 在集群中的物理分布,以实现负载均衡和性能优化。最后,Grain Extensions 提供了一种灵活的机制,可以在不侵入核心逻辑的情况下为 Grain 添加额外行为。这些组件共同构成了 Orleans 强大而简洁的分布式应用开发模型。
更多推荐


所有评论(0)