浅谈领域驱动设计(Domain-Driven Design,DDD)—基于 AIOps 项目的落地实践
AIOps 系统的复杂性往往不在“技术栈”,而在“业务语义”:告警如何聚合成故障?故障如何进入处置?处置动作如何审批、幂等、异步执行并可追踪?
DDD(Domain-Driven Design)的价值就在于:把这些业务知识映射到代码结构里,让代码成为业务规则的直接表达。
本文基于一个最小可运行的 AIOps Demo(告警接入 → 故障聚合 → 扩容计划 → 审批 → 异步执行 → 状态可查询),完整讲解领域驱动设计,并给出可落地的工程分层与目录结构(完整项目代码附在文末)。
1. 项目背景:我们要解决什么问题?
在日常的运维场景中,我们希望系统做到:
- 多源告警接入并标准化
- 相同症状的告警聚合为同一 Incident(减少告警风暴)
- 对 Incident 规划修复动作(如扩容),并保证业务幂等
- 修复动作需审批后异步执行(审批快返回,执行慢处理)
- Action / Incident 都有可追踪的状态与时间线
2. DDD 战略设计
2.1 限界上下文(Bounded Context)
限界上下文定义了一个领域模型的边界:在这个边界内,概念、规则和语言必须一致;跨边界交互必须显式。
本系统可以划分为三个上下文:
- 告警接入上下文(Alert Ingestion):多源告警接收、标准化、去重、路由
- 故障管理上下文(Incident Management):故障生命周期、告警聚合、时间线
- 修复执行上下文(Remediation Execution):修复动作规划、审批、异步执行、幂等
如下图所示:
代码如何体现上下文边界?
- 故障管理上下文的核心聚合根:
Incident - 修复执行上下文的核心聚合根:
RemediationAction - 跨上下文引用不直接持有对象,而用 ID 或 值对象引用
示例:
// domain/incident/Incident.java
public class Incident {
private final String id;
private final Severity severity;
private IncidentStatus status;
private final List<AlertRef> alerts; // 跨上下文:值对象引用
private final List<TimelineEntry> timeline;
}
// domain/remediation/RemediationAction.java
public class RemediationAction {
private final String id;
private final String incidentId; // 关联 Incident:只保留 ID
private final ActionType type;
private ActionStatus status;
}
2.2 通用语言(Ubiquitous Language)
通用语言要求:业务术语和代码命名一致,接口命名也一致,这样“讨论业务”就等同于“讨论代码”。
下面是关键术语对照(节选):
| 业务术语 | 代码体现 | 位置 |
|---|---|---|
| 故障 | Incident |
domain/incident/Incident.java |
| 告警引用 | AlertRef |
domain/incident/AlertRef.java |
| 严重程度 | Severity |
domain/incident/Severity.java |
| 修复动作 | RemediationAction |
domain/remediation/RemediationAction.java |
| 故障状态 | IncidentStatus |
domain/incident/IncidentStatus.java |
| 接入告警 | ingestAlert() |
application/usecase/IncidentAppService.java |
| 开始缓解 | startMitigation() |
domain/incident/Incident.java |
| 计划扩容 | planScaleOut() |
application/usecase/RemediationAppService.java |
| 审批动作 | approve() |
domain/remediation/RemediationAction.java |
接口同样体现通用语言(REST 语义即业务语义):
// POST /alerts/ingest - 接入告警
@PostMapping("/alerts/ingest")
public ResponseEntity<?> ingest(@Valid @RequestBody IngestAlertRequest req)
// POST /incidents/{incidentId}/plan/scale-out - 计划扩容
@PostMapping("/incidents/{incidentId}/plan/scale-out")
public ResponseEntity<?> planScaleOut(...)
// POST /actions/{actionId}/approve - 审批动作
@PostMapping("/actions/{actionId}/approve")
public ResponseEntity<?> approve(...)
3. DDD 战术设计
3.1 实体(Entity)
实体的核心特征是:有唯一标识 + 有生命周期 + 有可变状态。
Incident(聚合根实体)
public class Incident {
private final String id; // 唯一标识
private final String fingerprint; // 不变属性:用于聚合
private final Severity severity; // 不变属性
private IncidentStatus status; // 可变状态
private final List<AlertRef> alerts;
private final List<TimelineEntry> timeline;
private final List<Object> domainEvents;
private Incident(String id, String fingerprint, Severity severity) { ... }
public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) { ... }
public void addAlert(AlertRef alert) { ... }
public void startMitigation(String reason) { ... }
private void ensureNotTerminal() { ... }
public List<Object> pullDomainEvents() { ... }
}
实体特征总结:
- 唯一标识:
id终身不变 - 可变性:
status会流转 - 生命周期:open → startMitigation → close
- 行为封装:
addAlert()、startMitigation()等业务方法 - 不变量保护:终态不可变更(例如
ensureNotTerminal())
RemediationAction(聚合根实体)
public class RemediationAction {
private final String id;
private final String incidentId;
private final String dedupKey; // 幂等/去重关键字段
private final ActionType type;
private final String target;
private final Map<String, String> params;
private ActionStatus status;
private RemediationAction(String id, String incidentId, String dedupKey,
ActionType type, String target, Map<String, String> params) { ... }
public static RemediationAction plan(String incidentId, ActionType type, String target, Map<String, String> params) { ... }
public void approve(String approver) { ... }
public void start() { ... }
public void succeed() { ... }
public void fail(String reason) { ... }
public List<Object> pullDomainEvents() { ... }
}
3.2 值对象(Value Object)
值对象没有唯一标识,通过“值相等”来标识身份,通常不可变。
AlertRef(告警引用)
public record AlertRef(String alertId, String summary) {}
TimelineEntry(时间线条目)
public record TimelineEntry(long timestamp, String type, String message) {
static TimelineEntry now(String type, String message) {
return new TimelineEntry(System.currentTimeMillis(), type, message);
}
}
为什么用值对象?
因为 Incident 聚合内的告警、时间线是“组成部分”,不需要独立生命周期与 ID,且天然适合不可变。
聚合内部关系示意:
3.3 聚合(Aggregate)
聚合是一组领域对象的集合,以聚合根为入口维护一致性边界。
Incident 聚合规则
- 只能通过聚合根修改:外部不能直接改 alerts/timeline
- 事务边界:一次事务尽量只改一个聚合
- 跨聚合引用使用 ID:例如,
RemediationAction保存incidentId
Incident 聚合结构:
3.4 领域事件(Domain Event)
领域事件表示“领域中发生了什么重要事情”,是事件驱动架构的语义基础。
事件定义(领域层):
public record IncidentOpened(String incidentId, String fingerprint, String severity) {}
public record IncidentMitigationStarted(String incidentId, String severity) {}
public record ActionPlanned(String actionId, String incidentId, String type, String target) {}
public record ActionApproved(String actionId, String approver) {}
public record ActionSucceeded(String actionId) {}
public record ActionFailed(String actionId, String reason) {}
事件生命周期:从产生到被监听
代码链路示例:
- Step1:领域操作生成事件(Incident.open)
- Step2:应用服务保存聚合
- Step3:发布事件(并清空事件列表)
// domain/incident/Incident.java
public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) {
Incident inc = new Incident(UUID.randomUUID().toString(), fingerprint, severity);
inc.alerts.add(firstAlert);
inc.timeline.add(TimelineEntry.now("OPEN", "incident opened"));
inc.domainEvents.add(new IncidentOpened(inc.id, fingerprint, severity.name()));
return inc;
}
// application/usecase/IncidentAppService.java
public String ingestAlert(...) {
Incident inc = Incident.open(fingerprint, severity, new AlertRef(alertId, summary));
incidentRepo.save(inc);
publisher.publishAll(inc.pullDomainEvents()); // 发布后清空
return inc.id();
}
4. 模块与分层架构
4.1 目录结构
项目按 DDD 分层:interfaces → application → domain,infrastructure 反向依赖 application 的 SPI(接口)。
src/main/java/com/example/aiops/
├── AIOpsApplication.java
│
├── interfaces/
│ ├── IncidentController.java
│ └── dto/
│ ├── IngestAlertRequest.java
│ ├── PlanScaleOutRequest.java
│ └── ApproveActionRequest.java
│
├── application/
│ ├── usecase/
│ │ ├── IncidentAppService.java
│ │ └── RemediationAppService.java
│ ├── event/
│ │ └── ActionApprovedEvent.java
│ ├── listener/
│ │ └── ActionExecutionListener.java
│ └── spi/
│ ├── IncidentRepository.java
│ ├── ActionRepository.java
│ ├── ActionExecutor.java
│ └── DomainEventPublisher.java
│
├── domain/
│ ├── incident/
│ │ ├── Incident.java
│ │ ├── IncidentStatus.java
│ │ ├── Severity.java
│ │ ├── AlertRef.java
│ │ └── TimelineEntry.java
│ ├── remediation/
│ │ ├── RemediationAction.java
│ │ ├── ActionStatus.java
│ │ └── ActionType.java
│ └── events/
│ ├── IncidentOpened.java
│ ├── IncidentMitigationStarted.java
│ ├── ActionPlanned.java
│ ├── ActionApproved.java
│ ├── ActionSucceeded.java
│ └── ActionFailed.java
│
└── infrastructure/
├── repos/
│ ├── InMemoryIncidentRepository.java
│ └── InMemoryActionRepository.java
├── executor/
│ ├── AsyncConfig.java
│ └── SimulatedK8sExecutor.java
└── publisher/
└── SimpleLogEventPublisher.java
4.2 分层关系图(依赖方向)
5. 存储(Repository):为什么接口在 application/spi?
存储的本质不是“数据库技术”,而是应用层对外部持久化能力的抽象依赖。因此接口放在 application(SPI),实现放在 infrastructure。
存储接口:
public interface IncidentRepository {
Optional<Incident> findById(String incidentId);
Optional<Incident> findActiveByFingerprint(String fingerprint);
void save(Incident incident);
}
存储实现:
@Repository
public class InMemoryIncidentRepository implements IncidentRepository {
private final Map<String, Incident> byId = new ConcurrentHashMap<>();
private final Map<String, String> fingerprintToId = new ConcurrentHashMap<>();
...
}
6. 用例编排(Application Service)与幂等性
在 RemediationAppService.planScaleOut() 中,我们协调:
- 读取 Incident
- 让 Incident 进入处置态(领域规则)
- 创建 Action(工厂方法)
- 业务幂等:dedupKey 存在则复用
- 保存并发布领域事件
典型代码结构如下(节选):
public String planScaleOut(String incidentId, String target, int replicas) {
Incident inc = incidentRepo.findById(incidentId)
.orElseThrow(() -> new IllegalArgumentException("incident not found"));
inc.startMitigation("scale out inference service");
RemediationAction action = RemediationAction.plan(
incidentId, ActionType.SCALE_DEPLOYMENT, target,
Map.of("replicas", String.valueOf(replicas))
);
var existed = actionRepo.findByDedupKey(action.dedupKey());
incidentRepo.save(inc);
publisher.publishAll(inc.pullDomainEvents());
if (existed.isPresent()) return existed.get().id();
actionRepo.save(action);
publisher.publishAll(action.pullDomainEvents());
return action.id();
}
这类“跨聚合编排、事务边界、幂等策略”是应用层的典型工作,领域层保持纯粹。
7. 异步执行:为什么放在 Listener?
审批接口希望快速返回,而执行扩容是慢任务,需要异步:
- AppService:只负责“批准”这个用例(业务含义清晰)
- Listener:负责“批准之后的反应”(异步执行、可扩展、多订阅者)
这样做的优势在于:
- 用例边界清晰(approve 就是 approve)
- 异步可重试/可补偿
- 新增通知、审计、指标等订阅者无需改 AppService
8. DDD 核心模式总结
| 模式 | 项目体现 | 位置 |
|---|---|---|
| 限界上下文 | 故障管理 / 修复执行 / 告警接入 | 模块划分 |
| 通用语言 | ingestAlert / planScaleOut / approve | API + 方法名 |
| 聚合/聚合根 | Incident / RemediationAction | domain |
| 实体 | Incident / RemediationAction | domain |
| 值对象 | AlertRef / TimelineEntry | domain |
| 领域事件 | IncidentOpened / ActionApproved… | domain/events |
| 存储 | 接口在 application/spi,实现在 infra | application + infra |
| 工厂 | Incident.open() / RemediationAction.plan() |
domain |
| 应用服务 | 用例编排、幂等策略、跨聚合协调 | application/usecase |
| 事件监听 | 批准后的异步执行 | application/listener |
9. DDD 带来的价值
- 业务聚焦:代码直接表达运维语义(Incident、Mitigation、Action)
- 可维护性:状态流转与规则集中在领域模型里
- 可测试性:领域逻辑不依赖框架,可纯 Java 单测
- 可扩展性:新增修复类型只扩展
ActionType与执行器 - 团队协作:运维/开发围绕同一语言沟通
- 应对变化:存储、消息、执行器替换不影响核心业务模型
10. 总结
DDD 的本质不是“把包名改成 domain/application/infrastructure”,而是 把复杂业务领域的知识映射到代码结构中,通过清晰的领域模型,让代码成为业务规则的直接表达。
在 AIOps 这种“业务语义极重、状态流转复杂、异步编排普遍”的系统里,DDD 往往能带来非常显著的收益。
【附】项目源码(项目目录已在正文中展示)
package per.mjn.aiops.application.event;
public record ActionApprovedEvent(String actionId) {}
package per.mjn.aiops.application.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;
import per.mjn.aiops.application.event.ActionApprovedEvent;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionExecutor;
import per.mjn.aiops.application.spi.ActionRepository;
@Component
public class ActionExecutionListener {
private static final Logger log = LoggerFactory.getLogger(ActionExecutionListener.class);
private final ActionRepository actionRepo;
private final ActionExecutor executor;
public ActionExecutionListener(ActionRepository actionRepo, ActionExecutor executor) {
this.actionRepo = actionRepo;
this.executor = executor;
}
/**
* 关键点:
* - TransactionalEventListener:确保 approve 的事务提交后才触发
* - Async:异步线程执行,不阻塞 HTTP 请求
*/
@Async
@TransactionalEventListener
public void on(ActionApprovedEvent event) {
String actionId = event.actionId();
log.info("[ASYNC] receive ActionApprovedEvent: actionId={}", actionId);
RemediationAction act = actionRepo.findById(actionId)
.orElseThrow(() -> new IllegalStateException("action not found: " + actionId));
// 幂等/重复投递保护:只有 APPROVED 才能 start
try {
act.start();
actionRepo.save(act);
executor.execute(act);
act.succeed();
actionRepo.save(act);
log.info("[ASYNC] action succeeded: actionId={}", actionId);
} catch (Exception ex) {
log.warn("[ASYNC] execution error: actionId={}, reason={}", actionId, ex.getMessage());
// 如果 start() 就失败,说明已执行过/状态不对;这里尽量落失败(若已进入 RUNNING)
try {
act.fail(ex.getMessage());
actionRepo.save(act);
} catch (Exception ignore) {
// ignore: status not RUNNING, or already terminal
}
}
}
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.remediation.RemediationAction;
public interface ActionExecutor {
void execute(RemediationAction action);
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.remediation.RemediationAction;
import java.util.Optional;
public interface ActionRepository {
Optional<RemediationAction> findById(String actionId);
Optional<RemediationAction> findByDedupKey(String dedupKey);
void save(RemediationAction action);
}
package per.mjn.aiops.application.spi;
import java.util.List;
public interface DomainEventPublisher {
void publishAll(List<Object> events);
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.incident.Incident;
import java.util.Optional;
public interface IncidentRepository {
Optional<Incident> findById(String incidentId);
Optional<Incident> findActiveByFingerprint(String fingerprint);
void save(Incident incident);
}
package per.mjn.aiops.application.usecase;
import org.springframework.stereotype.Service;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.incident.AlertRef;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.incident.Severity;
import per.mjn.aiops.application.spi.IncidentRepository;
@Service
public class IncidentAppService {
private final IncidentRepository incidentRepo;
private final DomainEventPublisher publisher;
public IncidentAppService(IncidentRepository incidentRepo, DomainEventPublisher publisher) {
this.incidentRepo = incidentRepo;
this.publisher = publisher;
}
public String ingestAlert(String alertId, String cluster, String service, String symptom, String summary, Severity sev) {
String fingerprint = cluster + "|" + service + "|" + symptom;
Incident inc = incidentRepo.findActiveByFingerprint(fingerprint)
.map(existing -> {
existing.addAlert(new AlertRef(alertId, summary));
return existing;
})
.orElseGet(() -> Incident.open(fingerprint, sev, new AlertRef(alertId, summary)));
incidentRepo.save(inc);
publisher.publishAll(inc.pullDomainEvents());
return inc.id();
}
}
package per.mjn.aiops.application.usecase;
import org.springframework.stereotype.Service;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.remediation.ActionType;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.application.spi.IncidentRepository;
import java.util.Map;
@Service
public class RemediationAppService {
private final IncidentRepository incidentRepo;
private final ActionRepository actionRepo;
private final DomainEventPublisher publisher;
public RemediationAppService(IncidentRepository incidentRepo,
ActionRepository actionRepo,
DomainEventPublisher publisher) {
this.incidentRepo = incidentRepo;
this.actionRepo = actionRepo;
this.publisher = publisher;
}
public String planScaleOut(String incidentId, String target, int replicas) {
Incident inc = incidentRepo.findById(incidentId)
.orElseThrow(() -> new IllegalArgumentException("incident not found"));
// 领域规则:进入处置态
inc.startMitigation("scale out inference service");
RemediationAction action = RemediationAction.plan(
incidentId,
ActionType.SCALE_DEPLOYMENT,
target,
Map.of("replicas", String.valueOf(replicas))
);
// 幂等:dedupKey 存在就复用
var existed = actionRepo.findByDedupKey(action.dedupKey());
// 先保存 incident 及其事件
incidentRepo.save(inc);
publisher.publishAll(inc.pullDomainEvents());
if (existed.isPresent()) {
return existed.get().id();
}
actionRepo.save(action);
publisher.publishAll(action.pullDomainEvents());
return action.id();
}
/**
* 只审批并发布事件(异步执行由监听器完成)
*/
public RemediationAction approve(String actionId, String approver) {
RemediationAction act = actionRepo.findById(actionId)
.orElseThrow(() -> new IllegalArgumentException("action not found"));
act.approve(approver);
actionRepo.save(act);
// 这里会发布 ActionApproved 领域事件,进而触发 ActionApprovedEvent -> 异步执行
publisher.publishAll(act.pullDomainEvents());
return act;
}
}
package per.mjn.aiops.domain.events;
public record ActionApproved(String actionId, String approver) {}
package per.mjn.aiops.domain.events;
public record ActionFailed(String actionId, String reason) {}
package per.mjn.aiops.domain.events;
public record ActionPlanned(String actionId, String incidentId, String actionType, String target) {}
package per.mjn.aiops.domain.events;
public record ActionSucceeded(String actionId) {}
package per.mjn.aiops.domain.events;
public record IncidentMitigationStarted(String incidentId, String severity) {}
package per.mjn.aiops.domain.events;
public record IncidentOpened(String incidentId, String fingerprint, String severity) {}
package per.mjn.aiops.domain.incident;
public record AlertRef(String alertId, String summary) {}
package per.mjn.aiops.domain.incident;
import per.mjn.aiops.domain.events.IncidentMitigationStarted;
import per.mjn.aiops.domain.events.IncidentOpened;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class Incident {
private final String id;
private final String fingerprint;
private final Severity severity;
private IncidentStatus status;
private final List<AlertRef> alerts = new ArrayList<>();
private final List<TimelineEntry> timeline = new ArrayList<>();
private final List<Object> domainEvents = new ArrayList<>();
private Incident(String id, String fingerprint, Severity severity) {
this.id = id;
this.fingerprint = fingerprint;
this.severity = severity;
this.status = IncidentStatus.OPEN;
}
public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) {
Incident inc = new Incident(UUID.randomUUID().toString(), fingerprint, severity);
inc.alerts.add(firstAlert);
inc.timeline.add(TimelineEntry.now("OPEN", "incident opened"));
inc.domainEvents.add(new IncidentOpened(inc.id, fingerprint, severity.name()));
return inc;
}
public void addAlert(AlertRef alert) {
ensureNotTerminal();
alerts.add(alert);
timeline.add(TimelineEntry.now("ALERT_ADDED", alert.summary()));
}
public void startMitigation(String reason) {
ensureNotTerminal();
if (!(status == IncidentStatus.OPEN || status == IncidentStatus.TRIAGING)) {
throw new IllegalStateException("cannot start mitigation from status=" + status);
}
status = IncidentStatus.MITIGATING;
timeline.add(TimelineEntry.now("MITIGATING", reason));
domainEvents.add(new IncidentMitigationStarted(id, severity.name()));
}
private void ensureNotTerminal() {
if (status == IncidentStatus.CLOSED || status == IncidentStatus.CANCELED) {
throw new IllegalStateException("terminal status=" + status);
}
}
public List<Object> pullDomainEvents() {
List<Object> copy = List.copyOf(domainEvents);
domainEvents.clear();
return copy;
}
public String id() { return id; }
public String fingerprint() { return fingerprint; }
public Severity severity() { return severity; }
public IncidentStatus status() { return status; }
public List<AlertRef> alerts() { return List.copyOf(alerts); }
public List<TimelineEntry> timeline() { return List.copyOf(timeline); }
}
package per.mjn.aiops.domain.incident;
public enum IncidentStatus {
OPEN, TRIAGING, MITIGATING, RESOLVED, CLOSED, CANCELED
}
package per.mjn.aiops.domain.incident;
public enum Severity {
SEV1, SEV2, SEV3, SEV4
}
package per.mjn.aiops.domain.incident;
import java.time.Instant;
public record TimelineEntry(Instant at, String kind, String message) {
public static TimelineEntry now(String kind, String message) {
return new TimelineEntry(Instant.now(), kind, message);
}
}
package per.mjn.aiops.domain.remediation;
public enum ActionStatus {
PENDING, APPROVED, RUNNING, SUCCEEDED, FAILED
}
package per.mjn.aiops.domain.remediation;
public enum ActionType {
SCALE_DEPLOYMENT
}
package per.mjn.aiops.domain.remediation;
import per.mjn.aiops.domain.events.*;
import java.util.*;
public class RemediationAction {
private final String id;
private final String incidentId;
private final String dedupKey;
private final ActionType type;
private final String target;
private final Map<String, String> params;
private ActionStatus status;
private final List<Object> domainEvents = new ArrayList<>();
private RemediationAction(String id, String incidentId, String dedupKey,
ActionType type, String target, Map<String, String> params) {
this.id = id;
this.incidentId = incidentId;
this.dedupKey = dedupKey;
this.type = type;
this.target = target;
this.params = new HashMap<>(params);
this.status = ActionStatus.PENDING;
}
public static RemediationAction plan(String incidentId, ActionType type, String target, Map<String, String> params) {
String paramsKey = params.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> e.getKey() + "=" + e.getValue())
.reduce((a, b) -> a + "|" + b)
.orElse("");
String dedupKey = incidentId + ":" + type.name() + ":" + target + ":" + paramsKey;
RemediationAction act = new RemediationAction(UUID.randomUUID().toString(), incidentId, dedupKey, type, target, params);
act.domainEvents.add(new ActionPlanned(act.id, incidentId, type.name(), target));
return act;
}
public void approve(String approver) {
if (status != ActionStatus.PENDING) throw new IllegalStateException("must be PENDING to approve, now=" + status);
status = ActionStatus.APPROVED;
domainEvents.add(new ActionApproved(id, approver));
}
public void start() {
if (status != ActionStatus.APPROVED) throw new IllegalStateException("must be APPROVED to start, now=" + status);
status = ActionStatus.RUNNING;
}
public void succeed() {
if (status != ActionStatus.RUNNING) throw new IllegalStateException("must be RUNNING to succeed, now=" + status);
status = ActionStatus.SUCCEEDED;
domainEvents.add(new ActionSucceeded(id));
}
public void fail(String reason) {
if (status != ActionStatus.RUNNING) throw new IllegalStateException("must be RUNNING to fail, now=" + status);
status = ActionStatus.FAILED;
domainEvents.add(new ActionFailed(id, reason));
}
public List<Object> pullDomainEvents() {
List<Object> copy = List.copyOf(domainEvents);
domainEvents.clear();
return copy;
}
public String id() { return id; }
public String incidentId() { return incidentId; }
public String dedupKey() { return dedupKey; }
public ActionType type() { return type; }
public String target() { return target; }
public Map<String, String> params() { return Map.copyOf(params); }
public ActionStatus status() { return status; }
}
package per.mjn.aiops.infrastructure.executor;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration
@EnableAsync
public class AsyncConfig {
}
package per.mjn.aiops.infrastructure.executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import per.mjn.aiops.application.spi.ActionExecutor;
import per.mjn.aiops.domain.remediation.ActionType;
import per.mjn.aiops.domain.remediation.RemediationAction;
@Component
public class SimulatedK8sExecutor implements ActionExecutor {
private static final Logger log = LoggerFactory.getLogger(SimulatedK8sExecutor.class);
@Override
public void execute(RemediationAction action) {
if (action.type() == ActionType.SCALE_DEPLOYMENT) {
int replicas = Integer.parseInt(action.params().get("replicas"));
log.info("[K8S] scale deployment => target={}, replicas={}", action.target(), replicas);
if (replicas <= 0) {
throw new IllegalArgumentException("replicas must be > 0");
}
return;
}
throw new IllegalStateException("unsupported action type=" + action.type());
}
}
package per.mjn.aiops.infrastructure.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import per.mjn.aiops.application.event.ActionApprovedEvent;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.events.ActionApproved;
import java.util.List;
@Component
public class SimpleLogEventPublisher implements DomainEventPublisher {
private static final Logger log = LoggerFactory.getLogger(SimpleLogEventPublisher.class);
private final ApplicationEventPublisher springPublisher;
public SimpleLogEventPublisher(ApplicationEventPublisher springPublisher) {
this.springPublisher = springPublisher;
}
@Override
public void publishAll(List<Object> events) {
for (Object e : events) {
log.info("[DOMAIN_EVENT] {}", e);
// 领域事件 -> 应用事件(触发异步执行)
if (e instanceof ActionApproved aa) {
springPublisher.publishEvent(new ActionApprovedEvent(aa.actionId()));
}
}
}
}
package per.mjn.aiops.infrastructure.repos;
import org.springframework.stereotype.Repository;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.domain.remediation.RemediationAction;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Repository
public class InMemoryActionRepository implements ActionRepository {
private final Map<String, RemediationAction> byId = new ConcurrentHashMap<>();
private final Map<String, String> byDedup = new ConcurrentHashMap<>();
@Override
public Optional<RemediationAction> findById(String actionId) {
return Optional.ofNullable(byId.get(actionId));
}
@Override
public Optional<RemediationAction> findByDedupKey(String dedupKey) {
String id = byDedup.get(dedupKey);
return id == null ? Optional.empty() : Optional.ofNullable(byId.get(id));
}
@Override
public void save(RemediationAction action) {
byId.put(action.id(), action);
byDedup.put(action.dedupKey(), action.id());
}
}
package per.mjn.aiops.infrastructure.repos;
import org.springframework.stereotype.Repository;
import per.mjn.aiops.application.spi.IncidentRepository;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.incident.IncidentStatus;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Repository
public class InMemoryIncidentRepository implements IncidentRepository {
private final Map<String, Incident> byId = new ConcurrentHashMap<>();
@Override
public Optional<Incident> findById(String incidentId) {
return Optional.ofNullable(byId.get(incidentId));
}
@Override
public Optional<Incident> findActiveByFingerprint(String fingerprint) {
return byId.values().stream()
.filter(i -> i.fingerprint().equals(fingerprint))
.filter(i -> i.status() != IncidentStatus.CLOSED && i.status() != IncidentStatus.CANCELED)
.findFirst();
}
@Override
public void save(Incident incident) {
byId.put(incident.id(), incident);
}
}
package per.mjn.aiops.interfaces.dto;
import jakarta.validation.constraints.NotBlank;
public class ApproveActionRequest {
@NotBlank public String approver;
}
package per.mjn.aiops.interfaces.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import per.mjn.aiops.domain.incident.Severity;
public class IngestAlertRequest {
@NotBlank public String alertId;
@NotBlank public String cluster;
@NotBlank public String service;
@NotBlank public String symptom;
@NotBlank public String summary;
@NotNull public Severity severity;
}
package per.mjn.aiops.interfaces.dto;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
public class PlanScaleOutRequest {
@NotBlank public String target;
@Min(1) public int replicas;
}
package per.mjn.aiops.interfaces;
import jakarta.validation.Valid;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import per.mjn.aiops.application.usecase.IncidentAppService;
import per.mjn.aiops.application.usecase.RemediationAppService;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.application.spi.IncidentRepository;
import per.mjn.aiops.interfaces.dto.ApproveActionRequest;
import per.mjn.aiops.interfaces.dto.IngestAlertRequest;
import per.mjn.aiops.interfaces.dto.PlanScaleOutRequest;
import java.util.Map;
@RestController
public class IncidentController {
private final IncidentAppService incidentService;
private final RemediationAppService remediationService;
private final IncidentRepository incidentRepo;
private final ActionRepository actionRepo;
public IncidentController(IncidentAppService incidentService,
RemediationAppService remediationService,
IncidentRepository incidentRepo,
ActionRepository actionRepo) {
this.incidentService = incidentService;
this.remediationService = remediationService;
this.incidentRepo = incidentRepo;
this.actionRepo = actionRepo;
}
@PostMapping("/alerts/ingest")
public ResponseEntity<?> ingest(@Valid @RequestBody IngestAlertRequest req) {
String incidentId = incidentService.ingestAlert(
req.alertId, req.cluster, req.service, req.symptom, req.summary, req.severity
);
return ResponseEntity.ok(Map.of("incidentId", incidentId));
}
@PostMapping("/incidents/{incidentId}/plan/scale-out")
public ResponseEntity<?> planScaleOut(@PathVariable("incidentId") String incidentId, @Valid @RequestBody PlanScaleOutRequest req) {
try {
String actionId = remediationService.planScaleOut(incidentId, req.target, req.replicas);
return ResponseEntity.ok(Map.of("actionId", actionId));
} catch (IllegalArgumentException e) {
return ResponseEntity.status(404).body(Map.of("error", e.getMessage()));
} catch (IllegalStateException e) {
return ResponseEntity.status(409).body(Map.of("error", e.getMessage()));
}
}
/**
* 审批后立即返回,执行异步进行
*/
@PostMapping("/actions/{actionId}/approve")
public ResponseEntity<?> approve(@PathVariable("actionId") String actionId,
@Valid @RequestBody ApproveActionRequest req) {
try {
RemediationAction act = remediationService.approve(actionId, req.approver);
return ResponseEntity.ok(Map.of(
"actionId", act.id(),
"status", act.status().name(),
"message", "approved; execution will run asynchronously"
));
} catch (IllegalArgumentException e) {
return ResponseEntity.status(404).body(Map.of("error", e.getMessage()));
} catch (IllegalStateException e) {
return ResponseEntity.status(409).body(Map.of("error", e.getMessage()));
}
}
@GetMapping("/incidents/{incidentId}")
public ResponseEntity<?> getIncident(@PathVariable("incidentId") String incidentId) {
return incidentRepo.findById(incidentId)
.map(i -> ResponseEntity.ok(Map.of(
"incidentId", i.id(),
"fingerprint", i.fingerprint(),
"severity", i.severity().name(),
"status", i.status().name(),
"alerts", i.alerts(),
"timeline", i.timeline()
)))
.orElseGet(() -> ResponseEntity.status(404).body(Map.of("error", "incident not found")));
}
@GetMapping("/actions/{actionId}")
public ResponseEntity<?> getAction(@PathVariable("actionId") String actionId) {
return actionRepo.findById(actionId)
.map(a -> ResponseEntity.ok(Map.of(
"actionId", a.id(),
"incidentId", a.incidentId(),
"status", a.status().name(),
"type", a.type().name(),
"target", a.target(),
"params", a.params()
)))
.orElseGet(() -> ResponseEntity.status(404).body(Map.of("error", "action not found")));
}
}
package per.mjn.aiops;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AIOpsApplication {
public static void main(String[] args) {
SpringApplication.run(AIOpsApplication.class, args);
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>per.mjn</groupId>
<artifactId>ddd-aiops-min</artifactId>
<version>1.0.0</version>
<properties>
<spring-boot.version>3.3.2</spring-boot.version>
<java.version>21</java.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
系统测试
(1)告警接入:POST /alerts/ingest
curl -X POST http://localhost:8080/alerts/ingest \
-H "Content-Type: application/json" \
-d '{
"alertId": "a-001",
"cluster": "prod-1",
"service": "infer-qwen",
"symptom": "p95_latency_spike",
"summary": "p95 > 2s for 5m",
"severity": "SEV2"
}'
预期返回:
{
"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f"
}
若再 ingest 一次同 fingerprint(cluster+service+symptom)会归并到同一个 incident(返回相同或新的取决于 repo 里是否已经有 active)。
(2)规划扩容:POST /incidents/{incidentId}/plan/scale-out
curl -X POST http://localhost:8080/incidents/<incidentId>/plan/scale-out \
-H "Content-Type: application/json" \
-d '{ "target":"ns=llm,deploy=infer-qwen", "replicas": 6 }'
预期返回:
{
"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf"
}
(3)审批:POST /actions/{actionId}/approve(异步执行开始)
curl -X POST http://localhost:8080/actions/<actionId>/approve \
-H "Content-Type: application/json" \
-d '{ "approver":"oncall-1" }'
预期返回:
{
"status": "APPROVED",
"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf",
"message": "approved; execution will run asynchronously"
}
(4)查询 Action:GET /actions/{actionId}(轮询看状态变化)
curl http://localhost:8080/actions/<actionId>
预期返回:
{
"target": "ns=llm,deploy=infer-qwen",
"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f",
"params": {
"replicas": "6"
},
"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf",
"type": "SCALE_DEPLOYMENT",
"status": "SUCCEEDED"
}
(5)查询 Incident:GET /incidents/{incidentId}
curl http://localhost:8080/incidents/<incidentId>
预期返回:
{
"severity": "SEV2",
"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f",
"alerts": [
{
"alertId": "a-001",
"summary": "p95 > 2s for 5m"
}
],
"fingerprint": "prod-1|infer-qwen|p95_latency_spike",
"status": "MITIGATING",
"timeline": [
{
"at": "2026-01-22T13:50:39.371743600Z",
"kind": "OPEN",
"message": "incident opened"
},
{
"at": "2026-01-22T13:51:24.512972900Z",
"kind": "MITIGATING",
"message": "scale out inference service"
}
]
}
更多推荐



所有评论(0)