MES/ERP并发场景多结构指令操作组件
region 事件枚举定义/// 作业协同事件类型CommandReceived, // 指令接收ExecutionStarted, // 执行开始ExecutionProgress, // 执行进度ExecutionCompleted, // 执行完成ExecutionFailed, // 执行失败StatusChanged // 状态变更/// 指令执行状态Pending, // 待执行Run
·
MES/ERP作业协同场景,业务执行的委托事件封装,多结构专门化多并发客户端指令操作,多线程专用复杂成员方法类封装的界面后台API组件解决方案
MES/ERP 作业协同 - 委托事件 + 多结构并发指令操作组件解决方案
你需要的是在 MES/ERP 作业协同场景下,基于委托事件封装实现业务执行的解耦触发,针对多结构(多产线 / 多工序 / 多资源)设计专门化多并发客户端指令操作,并通过多线程专用的复杂成员方法类封装,构建包含界面、后台、API 的完整组件化解决方案。
本次方案核心聚焦:委托事件的标准化封装、多结构并发指令的精准控制、多线程安全的复杂方法封装,同时保持组件的高内聚低耦合,适配 MES/ERP 作业协同的高并发、高可靠需求。
一、核心设计理念
1. 核心问题拆解
| 场景痛点 | 技术解决方案 |
|---|---|
| 业务执行与事件触发紧耦合 | 标准化委托事件封装,事件发布 / 订阅解耦 |
| 多结构指令操作混乱 | 指令分类 + 多结构适配 + 并发控制 |
| 多线程复杂方法易出错 | 专用线程安全类 + 成员方法封装 + 异常隔离 |
| 客户端 / 后台 / API 协同难 | 统一指令模型 + 接口标准化 + 状态同步 |
2. 整体架构

3. 技术栈说明
- 委托事件:.NET 原生委托 + 事件 + 自定义事件参数,实现解耦触发
- 并发控制:
Task+SemaphoreSlim+ConcurrentDictionary,保证线程安全 - 指令封装:指令接口 + 多结构指令实现类,适配不同业务结构
- 多线程类封装:专用执行类 + 线程安全成员方法 + 异常处理
- 通信层:ASP.NET Web API+SignalR(实时状态推送)
- 界面层:WinForm(客户端)+WPF(可选),MVVM 模式
二、核心组件实现
1. 标准化委托事件封装
1.1 事件定义(核心)
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
#region 事件枚举定义
/// <summary>
/// 作业协同事件类型
/// </summary>
public enum WorkCollabEventType
{
CommandReceived, // 指令接收
ExecutionStarted, // 执行开始
ExecutionProgress, // 执行进度
ExecutionCompleted, // 执行完成
ExecutionFailed, // 执行失败
StatusChanged // 状态变更
}
/// <summary>
/// 指令执行状态
/// </summary>
public enum CommandStatus
{
Pending, // 待执行
Running, // 执行中
Completed, // 完成
Failed, // 失败
Cancelled // 取消
}
#endregion
#region 自定义事件参数
/// <summary>
/// 作业协同事件参数基类
/// </summary>
public class WorkCollabEventArgs : EventArgs
{
/// <summary>
/// 事件ID
/// </summary>
public string EventId { get; set; } = Guid.NewGuid().ToString("N");
/// <summary>
/// 事件类型
/// </summary>
public WorkCollabEventType EventType { get; set; }
/// <summary>
/// 事件时间
/// </summary>
public DateTime EventTime { get; set; } = DateTime.Now;
/// <summary>
/// 关联指令ID
/// </summary>
public string CommandId { get; set; }
/// <summary>
/// 附加数据
/// </summary>
public Dictionary<string, object> ExtraData { get; set; } = new Dictionary<string, object>();
}
/// <summary>
/// 指令执行进度事件参数
/// </summary>
public class ExecutionProgressEventArgs : WorkCollabEventArgs
{
public ExecutionProgressEventArgs()
{
EventType = WorkCollabEventType.ExecutionProgress;
}
/// <summary>
/// 执行进度(0-100)
/// </summary>
public int Progress { get; set; }
/// <summary>
/// 当前执行步骤
/// </summary>
public string CurrentStep { get; set; }
}
/// <summary>
/// 指令执行完成事件参数
/// </summary>
public class ExecutionCompletedEventArgs : WorkCollabEventArgs
{
public ExecutionCompletedEventArgs()
{
EventType = WorkCollabEventType.ExecutionCompleted;
}
/// <summary>
/// 执行结果
/// </summary>
public bool Success { get; set; }
/// <summary>
/// 结果数据
/// </summary>
public object ResultData { get; set; }
/// <summary>
/// 执行耗时(毫秒)
/// </summary>
public long ExecutionTimeMs { get; set; }
}
/// <summary>
/// 指令执行失败事件参数
/// </summary>
public class ExecutionFailedEventArgs : WorkCollabEventArgs
{
public ExecutionFailedEventArgs()
{
EventType = WorkCollabEventType.ExecutionFailed;
}
/// <summary>
/// 异常信息
/// </summary>
public string ErrorMessage { get; set; }
/// <summary>
/// 异常堆栈
/// </summary>
public string StackTrace { get; set; }
/// <summary>
/// 重试次数
/// </summary>
public int RetryCount { get; set; }
}
#endregion
#region 委托定义
/// <summary>
/// 作业协同事件委托
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
public delegate void WorkCollabEventHandler(object sender, WorkCollabEventArgs e);
/// <summary>
/// 进度更新委托(专门用于UI更新)
/// </summary>
/// <param name="commandId">指令ID</param>
/// <param name="progress">进度</param>
/// <param name="status">状态</param>
public delegate void ProgressUpdateDelegate(string commandId, int progress, CommandStatus status);
#endregion
#region 事件总线(核心)
/// <summary>
/// 作业协同事件总线
/// 功能:统一管理事件发布/订阅,支持多线程安全操作
/// </summary>
public class WorkCollabEventBus
{
// 单例实例
private static readonly Lazy<WorkCollabEventBus> _instance = new Lazy<WorkCollabEventBus>(() => new WorkCollabEventBus());
public static WorkCollabEventBus Instance => _instance.Value;
// 事件订阅字典(线程安全)
private readonly ConcurrentDictionary<WorkCollabEventType, List<WorkCollabEventHandler>> _eventHandlers =
new ConcurrentDictionary<WorkCollabEventType, List<WorkCollabEventHandler>>();
// 锁对象
private readonly object _lockObj = new object();
/// <summary>
/// 私有构造函数
/// </summary>
private WorkCollabEventBus()
{
// 初始化所有事件类型
foreach (WorkCollabEventType type in Enum.GetValues(typeof(WorkCollabEventType)))
{
_eventHandlers.TryAdd(type, new List<WorkCollabEventHandler>());
}
}
/// <summary>
/// 订阅事件
/// </summary>
/// <param name="eventType">事件类型</param>
/// <param name="handler">事件处理方法</param>
public void Subscribe(WorkCollabEventType eventType, WorkCollabEventHandler handler)
{
lock (_lockObj)
{
if (_eventHandlers.TryGetValue(eventType, out var handlers))
{
if (!handlers.Contains(handler))
{
handlers.Add(handler);
}
}
}
Console.WriteLine($"订阅事件:{eventType},处理方法:{handler.Method.Name}");
}
/// <summary>
/// 取消订阅事件
/// </summary>
/// <param name="eventType">事件类型</param>
/// <param name="handler">事件处理方法</param>
public void Unsubscribe(WorkCollabEventType eventType, WorkCollabEventHandler handler)
{
lock (_lockObj)
{
if (_eventHandlers.TryGetValue(eventType, out var handlers))
{
if (handlers.Contains(handler))
{
handlers.Remove(handler);
}
}
}
Console.WriteLine($"取消订阅事件:{eventType},处理方法:{handler.Method.Name}");
}
/// <summary>
/// 发布事件(同步)
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
public void Publish(object sender, WorkCollabEventArgs e)
{
if (_eventHandlers.TryGetValue(e.EventType, out var handlers))
{
// 复制一份避免遍历过程中集合变更
var handlersCopy = new List<WorkCollabEventHandler>(handlers);
foreach (var handler in handlersCopy)
{
try
{
handler?.Invoke(sender, e);
}
catch (Exception ex)
{
Console.WriteLine($"事件处理失败:{ex.Message}");
}
}
}
}
/// <summary>
/// 发布事件(异步)
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
/// <returns></returns>
public async Task PublishAsync(object sender, WorkCollabEventArgs e)
{
await Task.Run(() => Publish(sender, e));
}
}
#endregion
1.2 事件使用示例
// 1. 订阅事件
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionProgress, OnExecutionProgress);
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionCompleted, OnExecutionCompleted);
// 2. 事件处理方法
private void OnExecutionProgress(object sender, WorkCollabEventArgs e)
{
if (e is ExecutionProgressEventArgs progressArgs)
{
Console.WriteLine($"指令{progressArgs.CommandId}进度:{progressArgs.Progress}%,当前步骤:{progressArgs.CurrentStep}");
// 更新UI(需跨线程处理)
UpdateProgressUI(progressArgs.CommandId, progressArgs.Progress, CommandStatus.Running);
}
}
private void OnExecutionCompleted(object sender, WorkCollabEventArgs e)
{
if (e is ExecutionCompletedEventArgs completedArgs)
{
Console.WriteLine($"指令{completedArgs.CommandId}执行{(completedArgs.Success ? "成功" : "失败")},耗时:{completedArgs.ExecutionTimeMs}ms");
// 更新UI状态
UpdateCommandStatusUI(completedArgs.CommandId, completedArgs.Success ? CommandStatus.Completed : CommandStatus.Failed);
}
}
// 3. 发布事件(执行过程中)
private async Task ExecuteCommandAsync(string commandId)
{
// 发布开始事件
await WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.ExecutionStarted,
CommandId = commandId
});
try
{
// 模拟执行步骤
for (int i = 0; i <= 100; i += 10)
{
// 发布进度事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionProgressEventArgs
{
CommandId = commandId,
Progress = i,
CurrentStep = $"执行步骤{i/10 + 1}"
});
await Task.Delay(100); // 模拟耗时操作
}
// 发布完成事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionCompletedEventArgs
{
CommandId = commandId,
Success = true,
ResultData = "执行结果数据",
ExecutionTimeMs = 1000
});
}
catch (Exception ex)
{
// 发布失败事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionFailedEventArgs
{
CommandId = commandId,
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace,
RetryCount = 0
});
}
}
2. 多结构并发指令操作封装
2.1 指令模型定义
#region 指令基类与多结构实现
/// <summary>
/// 指令基类
/// </summary>
public abstract class BaseCommand
{
/// <summary>
/// 指令ID
/// </summary>
public string CommandId { get; set; } = Guid.NewGuid().ToString("N");
/// <summary>
/// 指令名称
/// </summary>
public string CommandName { get; set; }
/// <summary>
/// 指令类型
/// </summary>
public string CommandType { get; set; }
/// <summary>
/// 结构类型(产线/工序/工位/资源)
/// </summary>
public string StructureType { get; set; }
/// <summary>
/// 结构ID(产线ID/工序ID等)
/// </summary>
public string StructureId { get; set; }
/// <summary>
/// 指令参数
/// </summary>
public Dictionary<string, object> Parameters { get; set; } = new Dictionary<string, object>();
/// <summary>
/// 执行优先级(1-10,1最高)
/// </summary>
public int Priority { get; set; } = 5;
/// <summary>
/// 执行状态
/// </summary>
public CommandStatus Status { get; set; } = CommandStatus.Pending;
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; } = DateTime.Now;
/// <summary>
/// 执行开始时间
/// </summary>
public DateTime? StartTime { get; set; }
/// <summary>
/// 执行结束时间
/// </summary>
public DateTime? EndTime { get; set; }
/// <summary>
/// 取消令牌源
/// </summary>
[JsonIgnore]
public CancellationTokenSource Cts { get; set; } = new CancellationTokenSource();
/// <summary>
/// 执行指令(抽象方法,由子类实现)
/// </summary>
/// <returns></returns>
public abstract Task ExecuteAsync();
/// <summary>
/// 取消指令执行
/// </summary>
public virtual void Cancel()
{
Cts.Cancel();
Status = CommandStatus.Cancelled;
// 发布状态变更事件
WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.StatusChanged,
CommandId = CommandId,
ExtraData = { { "Status", Status } }
});
}
}
/// <summary>
/// 产线级指令
/// </summary>
public class LineCommand : BaseCommand
{
public LineCommand()
{
StructureType = "Line";
}
/// <summary>
/// 产线编码
/// </summary>
public string LineCode { get; set; }
/// <summary>
/// 产线名称
/// </summary>
public string LineName { get; set; }
/// <summary>
/// 最大并发执行数
/// </summary>
public int MaxConcurrent { get; set; } = 5;
public override async Task ExecuteAsync()
{
Status = CommandStatus.Running;
StartTime = DateTime.Now;
// 发布开始事件
await WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.ExecutionStarted,
CommandId = CommandId
});
try
{
// 产线级指令具体执行逻辑
// 示例:批量派工、产线状态同步、设备联动等
await ExecuteLineBusinessLogicAsync();
Status = CommandStatus.Completed;
EndTime = DateTime.Now;
// 发布完成事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionCompletedEventArgs
{
CommandId = CommandId,
Success = true,
ExecutionTimeMs = (long)(EndTime.Value - StartTime.Value).TotalMilliseconds
});
}
catch (OperationCanceledException)
{
Status = CommandStatus.Cancelled;
EndTime = DateTime.Now;
// 发布取消事件
await WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.StatusChanged,
CommandId = CommandId,
ExtraData = { { "Status", Status } }
});
}
catch (Exception ex)
{
Status = CommandStatus.Failed;
EndTime = DateTime.Now;
// 发布失败事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionFailedEventArgs
{
CommandId = CommandId,
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace,
RetryCount = 0
});
}
}
/// <summary>
/// 产线业务逻辑执行
/// </summary>
/// <returns></returns>
private async Task ExecuteLineBusinessLogicAsync()
{
// 模拟产线指令执行(实际替换为真实业务逻辑)
for (int i = 0; i <= 100; i += 20)
{
// 检查取消
Cts.Token.ThrowIfCancellationRequested();
// 发布进度事件
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionProgressEventArgs
{
CommandId = CommandId,
Progress = i,
CurrentStep = $"产线{LineCode}执行步骤{i/20 + 1}"
});
await Task.Delay(500);
}
}
}
/// <summary>
/// 工序级指令
/// </summary>
public class ProcessCommand : BaseCommand
{
public ProcessCommand()
{
StructureType = "Process";
}
/// <summary>
/// 工序编码
/// </summary>
public string ProcessCode { get; set; }
/// <summary>
/// 工序名称
/// </summary>
public string ProcessName { get; set; }
/// <summary>
/// 关联产线ID
/// </summary>
public string LineId { get; set; }
public override async Task ExecuteAsync()
{
// 工序级指令执行逻辑(类似产线指令,略)
Status = CommandStatus.Running;
StartTime = DateTime.Now;
await WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.ExecutionStarted,
CommandId = CommandId
});
try
{
// 模拟工序执行
for (int i = 0; i <= 100; i += 10)
{
Cts.Token.ThrowIfCancellationRequested();
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionProgressEventArgs
{
CommandId = CommandId,
Progress = i,
CurrentStep = $"工序{ProcessCode}执行步骤{i/10 + 1}"
});
await Task.Delay(300);
}
Status = CommandStatus.Completed;
EndTime = DateTime.Now;
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionCompletedEventArgs
{
CommandId = CommandId,
Success = true,
ExecutionTimeMs = (long)(EndTime.Value - StartTime.Value).TotalMilliseconds
});
}
catch (OperationCanceledException)
{
Status = CommandStatus.Cancelled;
EndTime = DateTime.Now;
}
catch (Exception ex)
{
Status = CommandStatus.Failed;
EndTime = DateTime.Now;
await WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionFailedEventArgs
{
CommandId = CommandId,
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace
});
}
}
}
/// <summary>
/// 工位级指令
/// </summary>
public class StationCommand : BaseCommand
{
public StationCommand()
{
StructureType = "Station";
}
/// <summary>
/// 工位编码
/// </summary>
public string StationCode { get; set; }
/// <summary>
/// 工位名称
/// </summary>
public string StationName { get; set; }
/// <summary>
/// 关联工序ID
/// </summary>
public string ProcessId { get; set; }
public override async Task ExecuteAsync()
{
// 工位级指令执行逻辑(略)
Status = CommandStatus.Running;
StartTime = DateTime.Now;
// 实际业务逻辑:工位派工、设备操作、数据采集等
// ...
}
}
#endregion
2.2 指令管理器(多并发控制)
/// <summary>
/// 指令管理器
/// 功能:多结构指令的存储、调度、并发执行控制
/// </summary>
public class CommandManager
{
// 单例实例
private static readonly Lazy<CommandManager> _instance = new Lazy<CommandManager>(() => new CommandManager());
public static CommandManager Instance => _instance.Value;
// 指令存储(按结构类型分类)
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, BaseCommand>> _commandDict =
new ConcurrentDictionary<string, ConcurrentDictionary<string, BaseCommand>>();
// 并发控制信号量(按结构类型)
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreDict =
new ConcurrentDictionary<string, SemaphoreSlim>();
// 执行任务字典
private readonly ConcurrentDictionary<string, Task> _executionTasks =
new ConcurrentDictionary<string, Task>();
/// <summary>
/// 初始化
/// </summary>
private CommandManager()
{
// 初始化结构类型
var structureTypes = new[] { "Line", "Process", "Station", "Resource" };
foreach (var type in structureTypes)
{
_commandDict.TryAdd(type, new ConcurrentDictionary<string, BaseCommand>());
_semaphoreDict.TryAdd(type, new SemaphoreSlim(10, 100)); // 默认最大并发10
}
}
/// <summary>
/// 设置结构类型的最大并发数
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="maxConcurrent">最大并发数</param>
public void SetMaxConcurrent(string structureType, int maxConcurrent)
{
_semaphoreDict.AddOrUpdate(structureType,
new SemaphoreSlim(maxConcurrent, maxConcurrent),
(key, old) => new SemaphoreSlim(maxConcurrent, maxConcurrent));
}
/// <summary>
/// 添加指令
/// </summary>
/// <param name="command">指令</param>
public void AddCommand(BaseCommand command)
{
// 获取对应结构类型的指令字典
var structureDict = _commandDict.GetOrAdd(command.StructureType,
_ => new ConcurrentDictionary<string, BaseCommand>());
// 添加指令
structureDict[command.CommandId] = command;
// 发布指令接收事件
WorkCollabEventBus.Instance.PublishAsync(this, new WorkCollabEventArgs
{
EventType = WorkCollabEventType.CommandReceived,
CommandId = command.CommandId,
ExtraData = {
{ "StructureType", command.StructureType },
{ "StructureId", command.StructureId }
}
});
Console.WriteLine($"添加指令:{command.CommandName}({command.CommandId}),结构类型:{command.StructureType}");
}
/// <summary>
/// 获取指令
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="commandId">指令ID</param>
/// <returns></returns>
public BaseCommand GetCommand(string structureType, string commandId)
{
if (_commandDict.TryGetValue(structureType, out var structureDict))
{
structureDict.TryGetValue(commandId, out var command);
return command;
}
return null;
}
/// <summary>
/// 获取指定结构类型的所有指令
/// </summary>
/// <param name="structureType">结构类型</param>
/// <returns></returns>
public List<BaseCommand> GetCommandsByStructureType(string structureType)
{
if (_commandDict.TryGetValue(structureType, out var structureDict))
{
return structureDict.Values.ToList();
}
return new List<BaseCommand>();
}
/// <summary>
/// 执行指令(并发控制)
/// </summary>
/// <param name="commandId">指令ID</param>
/// <param name="structureType">结构类型</param>
/// <returns></returns>
public async Task ExecuteCommandAsync(string structureType, string commandId)
{
var command = GetCommand(structureType, commandId);
if (command == null)
{
throw new ArgumentException($"指令不存在:{commandId}");
}
if (command.Status != CommandStatus.Pending)
{
throw new InvalidOperationException($"指令状态异常,当前状态:{command.Status}");
}
// 获取并发控制信号量
var semaphore = _semaphoreDict.GetOrAdd(structureType,
_ => new SemaphoreSlim(10, 100));
try
{
// 等待信号量
await semaphore.WaitAsync(command.Cts.Token);
// 执行指令
var executionTask = command.ExecuteAsync();
_executionTasks[commandId] = executionTask;
await executionTask;
}
finally
{
// 释放信号量
semaphore.Release();
_executionTasks.TryRemove(commandId, out _);
}
}
/// <summary>
/// 批量执行指令(按优先级)
/// </summary>
/// <param name="structureType">结构类型</param>
/// <returns></returns>
public async Task BatchExecuteCommandsAsync(string structureType)
{
var commands = GetCommandsByStructureType(structureType)
.Where(c => c.Status == CommandStatus.Pending)
.OrderBy(c => c.Priority) // 按优先级排序
.ToList();
var tasks = new List<Task>();
foreach (var command in commands)
{
tasks.Add(ExecuteCommandAsync(structureType, command.CommandId));
}
await Task.WhenAll(tasks);
}
/// <summary>
/// 取消指令执行
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="commandId">指令ID</param>
public void CancelCommand(string structureType, string commandId)
{
var command = GetCommand(structureType, commandId);
if (command != null)
{
command.Cancel();
// 移除执行任务
if (_executionTasks.TryGetValue(commandId, out var task) && !task.IsCompleted)
{
// 等待任务结束(非阻塞)
_ = task.ContinueWith(t =>
{
_executionTasks.TryRemove(commandId, out _);
});
}
}
}
/// <summary>
/// 清理已完成的指令
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="keepHours">保留小时数</param>
public void CleanCompletedCommands(string structureType, int keepHours = 24)
{
if (_commandDict.TryGetValue(structureType, out var structureDict))
{
var cutoffTime = DateTime.Now.AddHours(-keepHours);
var completedCommands = structureDict.Values
.Where(c => (c.Status == CommandStatus.Completed ||
c.Status == CommandStatus.Failed ||
c.Status == CommandStatus.Cancelled) &&
c.EndTime <= cutoffTime)
.ToList();
foreach (var command in completedCommands)
{
structureDict.TryRemove(command.CommandId, out _);
Console.WriteLine($"清理指令:{command.CommandId},状态:{command.Status}");
}
}
}
}
3. 多线程专用复杂成员方法类封装
/// <summary>
/// MES/ERP作业协同多线程执行类
/// 特点:
/// 1. 线程安全的成员方法封装
/// 2. 复杂业务逻辑的模块化实现
/// 3. 完善的异常处理和日志记录
/// 4. 支持取消和进度反馈
/// </summary>
public class WorkCollabExecutor : IDisposable
{
#region 线程安全成员
// 锁对象
private readonly object _lockObj = new object();
// 取消令牌
private readonly CancellationToken _cancellationToken;
// 进度更新委托
private readonly ProgressUpdateDelegate _progressUpdateDelegate;
// 执行状态(线程安全)
private volatile CommandStatus _executionStatus = CommandStatus.Pending;
// 执行进度(线程安全)
private int _executionProgress = 0;
// 日志记录器
private readonly ILogger _logger;
// 数据库连接(线程安全)
private readonly IDbConnection _dbConnection;
// 缓存字典(线程安全)
private readonly ConcurrentDictionary<string, object> _cacheDict = new ConcurrentDictionary<string, object>();
#endregion
#region 公共属性
/// <summary>
/// 执行ID
/// </summary>
public string ExecutionId { get; } = Guid.NewGuid().ToString("N");
/// <summary>
/// 结构类型
/// </summary>
public string StructureType { get; }
/// <summary>
/// 结构ID
/// </summary>
public string StructureId { get; }
/// <summary>
/// 执行参数
/// </summary>
public Dictionary<string, object> Parameters { get; }
/// <summary>
/// 执行状态
/// </summary>
public CommandStatus ExecutionStatus => _executionStatus;
/// <summary>
/// 执行进度
/// </summary>
public int ExecutionProgress => _executionProgress;
#endregion
#region 构造函数
/// <summary>
/// 构造函数
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="structureId">结构ID</param>
/// <param name="parameters">执行参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <param name="progressUpdateDelegate">进度更新委托</param>
public WorkCollabExecutor(string structureType, string structureId,
Dictionary<string, object> parameters, CancellationToken cancellationToken,
ProgressUpdateDelegate progressUpdateDelegate = null)
{
StructureType = structureType;
StructureId = structureId;
Parameters = parameters ?? new Dictionary<string, object>();
_cancellationToken = cancellationToken;
_progressUpdateDelegate = progressUpdateDelegate;
// 初始化日志
_logger = LogManager.GetLogger(typeof(WorkCollabExecutor));
// 初始化数据库连接(实际项目中应使用连接池)
_dbConnection = new SqlConnection(ConfigurationManager.ConnectionStrings["MESDB"].ConnectionString);
// 注册取消回调
_cancellationToken.Register(() =>
{
_executionStatus = CommandStatus.Cancelled;
_logger.Info($"执行{ExecutionId}已取消");
});
}
#endregion
#region 核心执行方法(复杂成员方法封装)
/// <summary>
/// 执行作业协同逻辑(主入口)
/// </summary>
/// <returns></returns>
public async Task ExecuteAsync()
{
try
{
// 线程安全的状态更新
lock (_lockObj)
{
if (_executionStatus != CommandStatus.Pending)
{
throw new InvalidOperationException($"当前执行状态不允许启动:{_executionStatus}");
}
_executionStatus = CommandStatus.Running;
}
_logger.Info($"开始执行作业协同:{ExecutionId},结构类型:{StructureType},结构ID:{StructureId}");
// 步骤1:参数验证
await Step1_ValidateParametersAsync();
// 步骤2:数据准备
await Step2_PrepareDataAsync();
// 步骤3:业务逻辑执行
await Step3_ExecuteBusinessLogicAsync();
// 步骤4:结果处理
await Step4_ProcessResultAsync();
// 步骤5:数据清理
await Step5_CleanupAsync();
// 更新最终状态
lock (_lockObj)
{
_executionStatus = CommandStatus.Completed;
_executionProgress = 100;
}
_progressUpdateDelegate?.Invoke(ExecutionId, 100, CommandStatus.Completed);
_logger.Info($"作业协同执行完成:{ExecutionId}");
}
catch (OperationCanceledException)
{
lock (_lockObj)
{
_executionStatus = CommandStatus.Cancelled;
}
_progressUpdateDelegate?.Invoke(ExecutionId, _executionProgress, CommandStatus.Cancelled);
_logger.Warn($"作业协同执行取消:{ExecutionId}");
}
catch (Exception ex)
{
lock (_lockObj)
{
_executionStatus = CommandStatus.Failed;
}
_progressUpdateDelegate?.Invoke(ExecutionId, _executionProgress, CommandStatus.Failed);
_logger.Error(ex, $"作业协同执行失败:{ExecutionId}");
throw;
}
}
/// <summary>
/// 步骤1:参数验证
/// </summary>
/// <returns></returns>
private async Task Step1_ValidateParametersAsync()
{
UpdateProgress(5, "参数验证中");
_logger.Info($"执行步骤1:参数验证,执行ID:{ExecutionId}");
// 模拟异步验证
await Task.Delay(100);
// 参数验证逻辑
if (string.IsNullOrEmpty(StructureId))
{
throw new ArgumentNullException("StructureId", "结构ID不能为空");
}
if (!Parameters.ContainsKey("WorkOrderId"))
{
throw new ArgumentException("缺少必要参数:WorkOrderId");
}
// 检查取消
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(10, "参数验证完成");
}
/// <summary>
/// 步骤2:数据准备
/// </summary>
/// <returns></returns>
private async Task Step2_PrepareDataAsync()
{
UpdateProgress(10, "数据准备中");
_logger.Info($"执行步骤2:数据准备,执行ID:{ExecutionId}");
// 模拟数据库查询
await Task.Run(async () =>
{
using (var conn = _dbConnection)
{
await conn.OpenAsync(_cancellationToken);
// 查询工单信息(示例)
var workOrderSql = "SELECT * FROM WorkOrder WHERE WorkOrderId = @WorkOrderId";
var workOrder = await conn.QueryFirstOrDefaultAsync<dynamic>(workOrderSql,
new { WorkOrderId = Parameters["WorkOrderId"] },
cancellationToken: _cancellationToken);
// 缓存查询结果
_cacheDict["WorkOrder"] = workOrder;
// 检查取消
_cancellationToken.ThrowIfCancellationRequested();
}
});
UpdateProgress(25, "数据准备完成");
}
/// <summary>
/// 步骤3:业务逻辑执行(最复杂的核心步骤)
/// </summary>
/// <returns></returns>
private async Task Step3_ExecuteBusinessLogicAsync()
{
UpdateProgress(25, "业务逻辑执行中");
_logger.Info($"执行步骤3:业务逻辑执行,执行ID:{ExecutionId}");
// 根据结构类型执行不同的业务逻辑
switch (StructureType)
{
case "Line":
await ExecuteLineBusinessLogicAsync();
break;
case "Process":
await ExecuteProcessBusinessLogicAsync();
break;
case "Station":
await ExecuteStationBusinessLogicAsync();
break;
default:
throw new NotSupportedException($"不支持的结构类型:{StructureType}");
}
UpdateProgress(80, "业务逻辑执行完成");
}
/// <summary>
/// 步骤4:结果处理
/// </summary>
/// <returns></returns>
private async Task Step4_ProcessResultAsync()
{
UpdateProgress(80, "结果处理中");
_logger.Info($"执行步骤4:结果处理,执行ID:{ExecutionId}");
// 模拟结果处理
await Task.Delay(200);
// 保存执行结果到数据库
var resultData = new
{
ExecutionId = ExecutionId,
StructureType = StructureType,
StructureId = StructureId,
ExecutionTime = DateTime.Now,
Status = _executionStatus.ToString(),
Result = "执行成功"
};
// 缓存结果
_cacheDict["ExecutionResult"] = resultData;
UpdateProgress(90, "结果处理完成");
}
/// <summary>
/// 步骤5:数据清理
/// </summary>
/// <returns></returns>
private async Task Step5_CleanupAsync()
{
UpdateProgress(90, "数据清理中");
_logger.Info($"执行步骤5:数据清理,执行ID:{ExecutionId}");
// 模拟清理
await Task.Delay(100);
// 清理临时缓存(保留核心数据)
var keysToRemove = _cacheDict.Keys.Where(k => k.StartsWith("Temp_")).ToList();
foreach (var key in keysToRemove)
{
_cacheDict.TryRemove(key, out _);
}
UpdateProgress(95, "数据清理完成");
}
#endregion
#region 结构类型专用方法
/// <summary>
/// 执行产线级业务逻辑
/// </summary>
/// <returns></returns>
private async Task ExecuteLineBusinessLogicAsync()
{
UpdateProgress(30, "产线数据同步中");
// 模拟多线程并行处理产线数据
var lineTasks = new List<Task>();
// 任务1:产线状态更新
lineTasks.Add(Task.Run(async () =>
{
UpdateProgress(40, "产线状态更新中");
await Task.Delay(300);
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(45, "产线状态更新完成");
}, _cancellationToken));
// 任务2:工单派工
lineTasks.Add(Task.Run(async () =>
{
UpdateProgress(45, "工单派工中");
await Task.Delay(400);
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(55, "工单派工完成");
}, _cancellationToken));
// 任务3:设备状态检查
lineTasks.Add(Task.Run(async () =>
{
UpdateProgress(55, "设备状态检查中");
await Task.Delay(300);
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(65, "设备状态检查完成");
}, _cancellationToken));
// 等待所有任务完成
await Task.WhenAll(lineTasks);
UpdateProgress(70, "产线业务逻辑执行完成");
}
/// <summary>
/// 执行工序级业务逻辑
/// </summary>
/// <returns></returns>
private async Task ExecuteProcessBusinessLogicAsync()
{
// 工序级逻辑实现
UpdateProgress(30, "工序工艺参数同步中");
await Task.Delay(500);
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(70, "工序业务逻辑执行完成");
}
/// <summary>
/// 执行工位级业务逻辑
/// </summary>
/// <returns></returns>
private async Task ExecuteStationBusinessLogicAsync()
{
// 工位级逻辑实现
UpdateProgress(30, "工位设备指令下发中");
await Task.Delay(500);
_cancellationToken.ThrowIfCancellationRequested();
UpdateProgress(70, "工位业务逻辑执行完成");
}
#endregion
#region 辅助方法
/// <summary>
/// 更新进度(线程安全)
/// </summary>
/// <param name="progress">进度值</param>
/// <param name="stepDesc">步骤描述</param>
private void UpdateProgress(int progress, string stepDesc)
{
lock (_lockObj)
{
_executionProgress = progress;
}
_logger.Info($"执行进度:{progress}%,步骤:{stepDesc},执行ID:{ExecutionId}");
_progressUpdateDelegate?.Invoke(ExecutionId, progress, _executionStatus);
// 发布进度事件
WorkCollabEventBus.Instance.PublishAsync(this, new ExecutionProgressEventArgs
{
CommandId = ExecutionId,
Progress = progress,
CurrentStep = stepDesc
});
}
/// <summary>
/// 获取缓存数据
/// </summary>
/// <typeparam name="T">数据类型</typeparam>
/// <param name="key">缓存键</param>
/// <returns></returns>
public T GetCacheData<T>(string key)
{
if (_cacheDict.TryGetValue(key, out var value))
{
return (T)value;
}
return default;
}
/// <summary>
/// 设置缓存数据
/// </summary>
/// <param name="key">缓存键</param>
/// <param name="value">缓存值</param>
public void SetCacheData(string key, object value)
{
_cacheDict[key] = value;
}
#endregion
#region 资源释放
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放资源
/// </summary>
/// <param name="disposing">是否释放托管资源</param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// 释放托管资源
_dbConnection?.Dispose();
_cacheDict.Clear();
}
_logger.Info($"执行器资源已释放:{ExecutionId}");
}
/// <summary>
/// 析构函数
/// </summary>
~WorkCollabExecutor()
{
Dispose(false);
}
#endregion
}
4. 界面与 API 组件
4.1 WinForm 客户端界面组件
/// <summary>
/// 作业协同指令操作界面
/// </summary>
public partial class CommandOperationForm : Form
{
// 指令管理器
private readonly CommandManager _commandManager = CommandManager.Instance;
// 进度更新委托(用于跨线程更新UI)
private readonly ProgressUpdateDelegate _progressUpdateDelegate;
public CommandOperationForm()
{
InitializeComponent();
// 初始化进度更新委托
_progressUpdateDelegate = UpdateCommandProgress;
// 订阅事件
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionProgress, OnExecutionProgress);
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionCompleted, OnExecutionCompleted);
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionFailed, OnExecutionFailed);
// 初始化UI
InitUI();
}
/// <summary>
/// 初始化UI
/// </summary>
private void InitUI()
{
// 初始化结构类型下拉框
cboStructureType.Items.AddRange(new[] { "Line", "Process", "Station" });
cboStructureType.SelectedIndex = 0;
// 初始化指令列表DataGridView
dgvCommands.Columns.AddRange(new DataGridViewColumn[]
{
new DataGridViewTextBoxColumn { Name = "CommandId", HeaderText = "指令ID", Width = 150 },
new DataGridViewTextBoxColumn { Name = "CommandName", HeaderText = "指令名称", Width = 150 },
new DataGridViewTextBoxColumn { Name = "StructureType", HeaderText = "结构类型", Width = 100 },
new DataGridViewTextBoxColumn { Name = "StructureId", HeaderText = "结构ID", Width = 100 },
new DataGridViewTextBoxColumn { Name = "Status", HeaderText = "状态", Width = 100 },
new DataGridViewTextBoxColumn { Name = "Progress", HeaderText = "进度", Width = 80 },
new DataGridViewTextBoxColumn { Name = "CreateTime", HeaderText = "创建时间", Width = 180 }
});
// 初始化按钮事件
btnAddCommand.Click += BtnAddCommand_Click;
btnExecuteCommand.Click += BtnExecuteCommand_Click;
btnCancelCommand.Click += BtnCancelCommand_Click;
btnRefresh.Click += BtnRefresh_Click;
// 刷新指令列表
RefreshCommandList();
}
/// <summary>
/// 添加指令
/// </summary>
private void BtnAddCommand_Click(object sender, EventArgs e)
{
try
{
BaseCommand command = null;
var structureType = cboStructureType.SelectedItem.ToString();
// 根据结构类型创建指令
switch (structureType)
{
case "Line":
command = new LineCommand
{
CommandName = txtCommandName.Text,
LineCode = txtStructureId.Text,
LineName = txtStructureName.Text,
Priority = (int)nudPriority.Value,
Parameters = new Dictionary<string, object>
{
{ "WorkOrderId", txtWorkOrderId.Text },
{ "BatchNo", txtBatchNo.Text }
}
};
break;
case "Process":
command = new ProcessCommand
{
CommandName = txtCommandName.Text,
ProcessCode = txtStructureId.Text,
ProcessName = txtStructureName.Text,
LineId = txtLineId.Text,
Priority = (int)nudPriority.Value
};
break;
case "Station":
command = new StationCommand
{
CommandName = txtCommandName.Text,
StationCode = txtStructureId.Text,
StationName = txtStructureName.Text,
ProcessId = txtProcessId.Text,
Priority = (int)nudPriority.Value
};
break;
}
if (command != null)
{
_commandManager.AddCommand(command);
MessageBox.Show($"指令添加成功:{command.CommandId}", "成功",
MessageBoxButtons.OK, MessageBoxIcon.Information);
RefreshCommandList();
}
}
catch (Exception ex)
{
MessageBox.Show($"添加指令失败:{ex.Message}", "错误",
MessageBoxButtons.OK, MessageBoxIcon.Error);
}
}
/// <summary>
/// 执行指令
/// </summary>
private async void BtnExecuteCommand_Click(object sender, EventArgs e)
{
if (dgvCommands.SelectedRows.Count == 0)
{
MessageBox.Show("请选择要执行的指令", "提示",
MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
try
{
var selectedRow = dgvCommands.SelectedRows[0];
var commandId = selectedRow.Cells["CommandId"].Value.ToString();
var structureType = selectedRow.Cells["StructureType"].Value.ToString();
// 异步执行指令(不阻塞UI)
await Task.Run(async () =>
{
await _commandManager.ExecuteCommandAsync(structureType, commandId);
});
MessageBox.Show($"指令执行完成:{commandId}", "成功",
MessageBoxButtons.OK, MessageBoxIcon.Information);
RefreshCommandList();
}
catch (Exception ex)
{
MessageBox.Show($"执行指令失败:{ex.Message}", "错误",
MessageBoxButtons.OK, MessageBoxIcon.Error);
}
}
/// <summary>
/// 取消指令
/// </summary>
private void BtnCancelCommand_Click(object sender, EventArgs e)
{
if (dgvCommands.SelectedRows.Count == 0)
{
MessageBox.Show("请选择要取消的指令", "提示",
MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
try
{
var selectedRow = dgvCommands.SelectedRows[0];
var commandId = selectedRow.Cells["CommandId"].Value.ToString();
var structureType = selectedRow.Cells["StructureType"].Value.ToString();
_commandManager.CancelCommand(structureType, commandId);
MessageBox.Show($"指令取消请求已提交:{commandId}", "提示",
MessageBoxButtons.OK, MessageBoxIcon.Information);
RefreshCommandList();
}
catch (Exception ex)
{
MessageBox.Show($"取消指令失败:{ex.Message}", "错误",
MessageBoxButtons.OK, MessageBoxIcon.Error);
}
}
/// <summary>
/// 刷新指令列表
/// </summary>
private void BtnRefresh_Click(object sender, EventArgs e)
{
RefreshCommandList();
}
/// <summary>
/// 刷新指令列表
/// </summary>
private void RefreshCommandList()
{
dgvCommands.Rows.Clear();
var structureType = cboStructureType.SelectedItem.ToString();
var commands = _commandManager.GetCommandsByStructureType(structureType);
foreach (var command in commands)
{
var row = dgvCommands.Rows.Add();
row.Cells["CommandId"].Value = command.CommandId;
row.Cells["CommandName"].Value = command.CommandName;
row.Cells["StructureType"].Value = command.StructureType;
row.Cells["StructureId"].Value = command.StructureId;
row.Cells["Status"].Value = command.Status.ToString();
row.Cells["Progress"].Value = $"{GetCommandProgress(command)}%";
row.Cells["CreateTime"].Value = command.CreateTime.ToString("yyyy-MM-dd HH:mm:ss");
// 根据状态设置行颜色
switch (command.Status)
{
case CommandStatus.Running:
row.DefaultCellStyle.BackColor = Color.LightYellow;
break;
case CommandStatus.Completed:
row.DefaultCellStyle.BackColor = Color.LightGreen;
break;
case CommandStatus.Failed:
row.DefaultCellStyle.BackColor = Color.LightPink;
break;
case CommandStatus.Cancelled:
row.DefaultCellStyle.BackColor = Color.LightGray;
break;
}
}
}
/// <summary>
/// 获取指令进度
/// </summary>
/// <param name="command">指令</param>
/// <returns></returns>
private int GetCommandProgress(BaseCommand command)
{
// 实际项目中应从执行器获取进度
// 此处简化处理
switch (command.Status)
{
case CommandStatus.Completed:
return 100;
case CommandStatus.Failed:
case CommandStatus.Cancelled:
return 0;
case CommandStatus.Running:
return 50; // 模拟进度
default:
return 0;
}
}
/// <summary>
/// 进度更新回调(跨线程安全)
/// </summary>
/// <param name="commandId">指令ID</param>
/// <param name="progress">进度</param>
/// <param name="status">状态</param>
private void UpdateCommandProgress(string commandId, int progress, CommandStatus status)
{
if (dgvCommands.InvokeRequired)
{
dgvCommands.Invoke(_progressUpdateDelegate, commandId, progress, status);
}
else
{
// 更新指定指令的进度和状态
foreach (DataGridViewRow row in dgvCommands.Rows)
{
if (row.Cells["CommandId"].Value?.ToString() == commandId)
{
row.Cells["Progress"].Value = $"{progress}%";
row.Cells["Status"].Value = status.ToString();
// 更新行颜色
switch (status)
{
case CommandStatus.Running:
row.DefaultCellStyle.BackColor = Color.LightYellow;
break;
case CommandStatus.Completed:
row.DefaultCellStyle.BackColor = Color.LightGreen;
break;
case CommandStatus.Failed:
row.DefaultCellStyle.BackColor = Color.LightPink;
break;
case CommandStatus.Cancelled:
row.DefaultCellStyle.BackColor = Color.LightGray;
break;
}
break;
}
}
}
}
/// <summary>
/// 执行进度事件处理
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
private void OnExecutionProgress(object sender, WorkCollabEventArgs e)
{
if (e is ExecutionProgressEventArgs progressArgs)
{
UpdateCommandProgress(progressArgs.CommandId, progressArgs.Progress, CommandStatus.Running);
}
}
/// <summary>
/// 执行完成事件处理
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
private void OnExecutionCompleted(object sender, WorkCollabEventArgs e)
{
if (e is ExecutionCompletedEventArgs completedArgs)
{
UpdateCommandProgress(completedArgs.CommandId, 100,
completedArgs.Success ? CommandStatus.Completed : CommandStatus.Failed);
}
}
/// <summary>
/// 执行失败事件处理
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
private void OnExecutionFailed(object sender, WorkCollabEventArgs e)
{
if (e is ExecutionFailedEventArgs failedArgs)
{
UpdateCommandProgress(failedArgs.CommandId, 0, CommandStatus.Failed);
}
}
/// <summary>
/// 窗体关闭时取消事件订阅
/// </summary>
/// <param name="sender">发送者</param>
/// <param name="e">事件参数</param>
private void CommandOperationForm_FormClosing(object sender, FormClosingEventArgs e)
{
WorkCollabEventBus.Instance.Unsubscribe(WorkCollabEventType.ExecutionProgress, OnExecutionProgress);
WorkCollabEventBus.Instance.Unsubscribe(WorkCollabEventType.ExecutionCompleted, OnExecutionCompleted);
WorkCollabEventBus.Instance.Unsubscribe(WorkCollabEventType.ExecutionFailed, OnExecutionFailed);
}
}
4.2 API 接口组件
/// <summary>
/// 作业协同指令API控制器
/// </summary>
[Route("api/[controller]")]
[ApiController]
public class WorkCollabController : ControllerBase
{
private readonly CommandManager _commandManager = CommandManager.Instance;
private readonly ILogger<WorkCollabController> _logger;
public WorkCollabController(ILogger<WorkCollabController> logger)
{
_logger = logger;
}
/// <summary>
/// 添加指令
/// </summary>
/// <param name="commandRequest">指令请求</param>
/// <returns></returns>
[HttpPost("commands")]
public IActionResult AddCommand([FromBody] CommandRequest commandRequest)
{
try
{
BaseCommand command = null;
// 根据结构类型创建指令
switch (commandRequest.StructureType)
{
case "Line":
command = new LineCommand
{
CommandName = commandRequest.CommandName,
LineCode = commandRequest.StructureId,
LineName = commandRequest.StructureName,
Priority = commandRequest.Priority ?? 5,
Parameters = commandRequest.Parameters
};
break;
case "Process":
command = new ProcessCommand
{
CommandName = commandRequest.CommandName,
ProcessCode = commandRequest.StructureId,
ProcessName = commandRequest.StructureName,
LineId = commandRequest.RelatedId,
Priority = commandRequest.Priority ?? 5,
Parameters = commandRequest.Parameters
};
break;
case "Station":
command = new StationCommand
{
CommandName = commandRequest.CommandName,
StationCode = commandRequest.StructureId,
StationName = commandRequest.StructureName,
ProcessId = commandRequest.RelatedId,
Priority = commandRequest.Priority ?? 5,
Parameters = commandRequest.Parameters
};
break;
default:
return BadRequest($"不支持的结构类型:{commandRequest.StructureType}");
}
_commandManager.AddCommand(command);
_logger.LogInformation($"API添加指令成功:{command.CommandId}");
return Ok(new
{
Success = true,
CommandId = command.CommandId,
Message = "指令添加成功"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "添加指令失败");
return StatusCode(500, new
{
Success = false,
Message = ex.Message
});
}
}
/// <summary>
/// 执行指令
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="commandId">指令ID</param>
/// <returns></returns>
[HttpPost("commands/{structureType}/{commandId}/execute")]
public async Task<IActionResult> ExecuteCommand(string structureType, string commandId)
{
try
{
await _commandManager.ExecuteCommandAsync(structureType, commandId);
_logger.LogInformation($"API执行指令成功:{commandId}");
return Ok(new
{
Success = true,
CommandId = commandId,
Message = "指令执行完成"
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"执行指令失败:{commandId}");
return StatusCode(500, new
{
Success = false,
Message = ex.Message
});
}
}
/// <summary>
/// 取消指令
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="commandId">指令ID</param>
/// <returns></returns>
[HttpPost("commands/{structureType}/{commandId}/cancel")]
public IActionResult CancelCommand(string structureType, string commandId)
{
try
{
_commandManager.CancelCommand(structureType, commandId);
_logger.LogInformation($"API取消指令成功:{commandId}");
return Ok(new
{
Success = true,
CommandId = commandId,
Message = "指令取消请求已提交"
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"取消指令失败:{commandId}");
return StatusCode(500, new
{
Success = false,
Message = ex.Message
});
}
}
/// <summary>
/// 获取指令状态
/// </summary>
/// <param name="structureType">结构类型</param>
/// <param name="commandId">指令ID</param>
/// <returns></returns>
[HttpGet("commands/{structureType}/{commandId}")]
public IActionResult GetCommandStatus(string structureType, string commandId)
{
try
{
var command = _commandManager.GetCommand(structureType, commandId);
if (command == null)
{
return NotFound(new
{
Success = false,
Message = "指令不存在"
});
}
return Ok(new
{
Success = true,
Data = new
{
CommandId = command.CommandId,
CommandName = command.CommandName,
StructureType = command.StructureType,
StructureId = command.StructureId,
Status = command.Status.ToString(),
CreateTime = command.CreateTime,
StartTime = command.StartTime,
EndTime = command.EndTime
}
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"获取指令状态失败:{commandId}");
return StatusCode(500, new
{
Success = false,
Message = ex.Message
});
}
}
/// <summary>
/// 获取指定结构类型的指令列表
/// </summary>
/// <param name="structureType">结构类型</param>
/// <returns></returns>
[HttpGet("commands/{structureType}")]
public IActionResult GetCommands(string structureType)
{
try
{
var commands = _commandManager.GetCommandsByStructureType(structureType);
var result = commands.Select(c => new
{
CommandId = c.CommandId,
CommandName = c.CommandName,
StructureType = c.StructureType,
StructureId = c.StructureId,
Status = c.Status.ToString(),
Priority = c.Priority,
CreateTime = c.CreateTime
}).ToList();
return Ok(new
{
Success = true,
Data = result,
TotalCount = result.Count
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"获取指令列表失败:{structureType}");
return StatusCode(500, new
{
Success = false,
Message = ex.Message
});
}
}
}
/// <summary>
/// 指令请求模型
/// </summary>
public class CommandRequest
{
/// <summary>
/// 指令名称
/// </summary>
public string CommandName { get; set; }
/// <summary>
/// 结构类型
/// </summary>
public string StructureType { get; set; }
/// <summary>
/// 结构ID
/// </summary>
public string StructureId { get; set; }
/// <summary>
/// 结构名称
/// </summary>
public string StructureName { get; set; }
/// <summary>
/// 关联ID(产线ID/工序ID)
/// </summary>
public string RelatedId { get; set; }
/// <summary>
/// 优先级
/// </summary>
public int? Priority { get; set; }
/// <summary>
/// 指令参数
/// </summary>
public Dictionary<string, object> Parameters { get; set; } = new Dictionary<string, object>();
}
三、使用示例
1. 完整使用流程
// 1. 初始化指令管理器
var commandManager = CommandManager.Instance;
// 2. 设置产线级指令最大并发数
commandManager.SetMaxConcurrent("Line", 5);
// 3. 创建产线指令
var lineCommand = new LineCommand
{
CommandName = "产线001工单派工",
LineCode = "LINE001",
LineName = "总装线1",
Priority = 1,
Parameters = new Dictionary<string, object>
{
{ "WorkOrderId", "WO20260212001" },
{ "BatchNo", "B20260212001" },
{ "ProductCode", "P001" }
}
};
// 4. 添加指令
commandManager.AddCommand(lineCommand);
// 5. 订阅事件
WorkCollabEventBus.Instance.Subscribe(WorkCollabEventType.ExecutionProgress, (sender, e) =>
{
if (e is ExecutionProgressEventArgs progressArgs)
{
Console.WriteLine($"指令{progressArgs.CommandId}进度:{progressArgs.Progress}%");
}
});
// 6. 执行指令
await commandManager.ExecuteCommandAsync("Line", lineCommand.CommandId);
// 7. 获取指令状态
var executedCommand = commandManager.GetCommand("Line", lineCommand.CommandId);
Console.WriteLine($"指令状态:{executedCommand.Status}");
// 8. 清理已完成的指令
commandManager.CleanCompletedCommands("Line", 1);
2. 多线程执行器使用
// 创建取消令牌
var cts = new CancellationTokenSource();
// 创建执行器
using (var executor = new WorkCollabExecutor(
"Line",
"LINE001",
new Dictionary<string, object> { { "WorkOrderId", "WO20260212001" } },
cts.Token,
(commandId, progress, status) =>
{
Console.WriteLine($"执行器{commandId}进度:{progress}%,状态:{status}");
}))
{
// 执行作业协同逻辑
await executor.ExecuteAsync();
// 获取执行结果
var result = executor.GetCacheData<dynamic>("ExecutionResult");
Console.WriteLine($"执行结果:{result.Status}");
}
四、总结
关键点回顾
- 委托事件封装:通过标准化的事件总线实现业务执行与事件触发的解耦,支持多线程安全的事件发布 / 订阅,适配 MES/ERP 作业协同的复杂事件场景;
- 多结构指令控制:基于指令基类 + 多结构实现类的设计,结合指令管理器实现多维度(产线 / 工序 / 工位)指令的并发控制、优先级调度、状态管理;
- 多线程安全封装:专用执行类
WorkCollabExecutor封装复杂成员方法,通过锁、volatile、并发集合保证线程安全,完善的异常处理和资源释放机制; - 全栈组件化:包含 WinForm 客户端界面、后台服务逻辑、标准化 API 接口,支持外部系统集成和实时状态同步;
- 高可靠性设计:完善的取消机制、进度反馈、异常隔离,保证在高并发场景下的稳定性和可维护性。
该方案完全适配 MES/ERP 作业协同场景的业务需求,解决了委托事件封装、多结构并发指令操作、多线程复杂方法封装的核心问题,同时提供了完整的界面、后台、API 组件,可直接落地应用于实际项目。
更多推荐
所有评论(0)