AIOps 系统的复杂性往往不在“技术栈”,而在“业务语义”:告警如何聚合成故障?故障如何进入处置?处置动作如何审批、幂等、异步执行并可追踪?

DDDDomain-Driven Design)的价值就在于:把这些业务知识映射到代码结构里,让代码成为业务规则的直接表达

本文基于一个最小可运行的 AIOps Demo(告警接入 → 故障聚合 → 扩容计划 → 审批 → 异步执行 → 状态可查询),完整讲解领域驱动设计,并给出可落地的工程分层与目录结构(完整项目代码附在文末)。


1. 项目背景:我们要解决什么问题?

在日常的运维场景中,我们希望系统做到:

  • 多源告警接入并标准化
  • 相同症状的告警聚合为同一 Incident(减少告警风暴)
  • 对 Incident 规划修复动作(如扩容),并保证业务幂等
  • 修复动作需审批后异步执行(审批快返回,执行慢处理)
  • Action / Incident 都有可追踪的状态与时间线

2. DDD 战略设计

2.1 限界上下文(Bounded Context)

限界上下文定义了一个领域模型的边界:在这个边界内,概念、规则和语言必须一致;跨边界交互必须显式。

本系统可以划分为三个上下文:

  • 告警接入上下文(Alert Ingestion):多源告警接收、标准化、去重、路由
  • 故障管理上下文(Incident Management):故障生命周期、告警聚合、时间线
  • 修复执行上下文(Remediation Execution):修复动作规划、审批、异步执行、幂等

如下图所示:

AIOps System

Remediation Execution Context

Plan action
(scale-out/rollout/restart...)

Approval
PENDING→APPROVED

Async execution
APPROVED→RUNNING→
SUCCEEDED/FAILED

Idempotency

Alert Ingestion Context

Receive multi-source alerts
(Prometheus/Grafana/
Custom)

Normalize

Dedup & Route

Incident Management Context

Ingest alert

Aggregate by Fingerprint
(cluster+service+symptom)

Incident lifecycle
OPEN→TRIAGING→
MITIGATING→CLOSED

Timeline record

代码如何体现上下文边界?
  • 故障管理上下文的核心聚合根:Incident
  • 修复执行上下文的核心聚合根:RemediationAction
  • 跨上下文引用不直接持有对象,而用 ID值对象引用

示例:

// domain/incident/Incident.java
public class Incident {
  private final String id;
  private final Severity severity;
  private IncidentStatus status;
  private final List<AlertRef> alerts;      // 跨上下文:值对象引用
  private final List<TimelineEntry> timeline;
}

// domain/remediation/RemediationAction.java
public class RemediationAction {
  private final String id;
  private final String incidentId;          // 关联 Incident:只保留 ID
  private final ActionType type;
  private ActionStatus status;
}

2.2 通用语言(Ubiquitous Language)

通用语言要求:业务术语和代码命名一致,接口命名也一致,这样“讨论业务”就等同于“讨论代码”。

下面是关键术语对照(节选):

业务术语 代码体现 位置
故障 Incident domain/incident/Incident.java
告警引用 AlertRef domain/incident/AlertRef.java
严重程度 Severity domain/incident/Severity.java
修复动作 RemediationAction domain/remediation/RemediationAction.java
故障状态 IncidentStatus domain/incident/IncidentStatus.java
接入告警 ingestAlert() application/usecase/IncidentAppService.java
开始缓解 startMitigation() domain/incident/Incident.java
计划扩容 planScaleOut() application/usecase/RemediationAppService.java
审批动作 approve() domain/remediation/RemediationAction.java

接口同样体现通用语言(REST 语义即业务语义):

// POST /alerts/ingest - 接入告警
@PostMapping("/alerts/ingest")
public ResponseEntity<?> ingest(@Valid @RequestBody IngestAlertRequest req)

// POST /incidents/{incidentId}/plan/scale-out - 计划扩容
@PostMapping("/incidents/{incidentId}/plan/scale-out")
public ResponseEntity<?> planScaleOut(...)

// POST /actions/{actionId}/approve - 审批动作
@PostMapping("/actions/{actionId}/approve")
public ResponseEntity<?> approve(...)

3. DDD 战术设计

3.1 实体(Entity)

实体的核心特征是:有唯一标识 + 有生命周期 + 有可变状态

Incident(聚合根实体)
public class Incident {
  private final String id;                 // 唯一标识
  private final String fingerprint;         // 不变属性:用于聚合
  private final Severity severity;          // 不变属性

  private IncidentStatus status;            // 可变状态
  private final List<AlertRef> alerts;
  private final List<TimelineEntry> timeline;
  private final List<Object> domainEvents;

  private Incident(String id, String fingerprint, Severity severity) { ... }

  public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) { ... }

  public void addAlert(AlertRef alert) { ... }

  public void startMitigation(String reason) { ... }

  private void ensureNotTerminal() { ... }

  public List<Object> pullDomainEvents() { ... }
}

实体特征总结:

  • 唯一标识id 终身不变
  • 可变性status 会流转
  • 生命周期:open → startMitigation → close
  • 行为封装addAlert()startMitigation() 等业务方法
  • 不变量保护:终态不可变更(例如 ensureNotTerminal()
RemediationAction(聚合根实体)
public class RemediationAction {
  private final String id;
  private final String incidentId;
  private final String dedupKey;         // 幂等/去重关键字段

  private final ActionType type;
  private final String target;
  private final Map<String, String> params;

  private ActionStatus status;

  private RemediationAction(String id, String incidentId, String dedupKey,
                              ActionType type, String target, Map<String, String> params) { ... }
  
  public static RemediationAction plan(String incidentId, ActionType type, String target, Map<String, String> params) { ... }

  public void approve(String approver) { ... }

  public void start() { ... }
 
  public void succeed() { ... }

  public void fail(String reason) { ... }

  public List<Object> pullDomainEvents() { ... }
}

3.2 值对象(Value Object)

值对象没有唯一标识,通过“值相等”来标识身份,通常不可变。

AlertRef(告警引用)
public record AlertRef(String alertId, String summary) {}
TimelineEntry(时间线条目)
public record TimelineEntry(long timestamp, String type, String message) {
  static TimelineEntry now(String type, String message) {
    return new TimelineEntry(System.currentTimeMillis(), type, message);
  }
}

为什么用值对象?

因为 Incident 聚合内的告警、时间线是“组成部分”,不需要独立生命周期与 ID,且天然适合不可变。

聚合内部关系示意:

1
1
*
*

Incident

+String id

+String fingerprint

+Severity severity

+IncidentStatus status

+List<AlertRef> alerts

+List<TimelineEntry> timeline

+addAlert()

+startMitigation()

+pullDomainEvents()

AlertRef

+String alertId

+String summary

TimelineEntry

+long timestamp

+String type

+String message


3.3 聚合(Aggregate)

聚合是一组领域对象的集合,以聚合根为入口维护一致性边界。

Incident 聚合规则
  • 只能通过聚合根修改:外部不能直接改 alerts/timeline
  • 事务边界:一次事务尽量只改一个聚合
  • 跨聚合引用使用 ID:例如,RemediationAction 保存 incidentId

Incident 聚合结构:

Incident Aggregate

Aggregate Root: Incident

List--AlertRef

List--TimelineEntry

List--DomainEvents

Consistency Boundary:
All changes must go through Incident methods


3.4 领域事件(Domain Event)

领域事件表示“领域中发生了什么重要事情”,是事件驱动架构的语义基础。

事件定义(领域层):

public record IncidentOpened(String incidentId, String fingerprint, String severity) {}
public record IncidentMitigationStarted(String incidentId, String severity) {}
public record ActionPlanned(String actionId, String incidentId, String type, String target) {}
public record ActionApproved(String actionId, String approver) {}
public record ActionSucceeded(String actionId) {}
public record ActionFailed(String actionId, String reason) {}
事件生命周期:从产生到被监听
ActionExecutionListener Spring Event Bus DomainEventPublisher AppService Repository Domain Model (Aggregates) ActionExecutionListener Spring Event Bus DomainEventPublisher AppService Repository Domain Model (Aggregates) generate domainEvents in business method save(aggregate) pullDomainEvents() publishAll(events) publishEvent(event) @EventListener(event) @Async execute remediation

代码链路示例:

  • Step1:领域操作生成事件(Incident.open)
  • Step2:应用服务保存聚合
  • Step3:发布事件(并清空事件列表)
// domain/incident/Incident.java
public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) {
  Incident inc = new Incident(UUID.randomUUID().toString(), fingerprint, severity);
  inc.alerts.add(firstAlert);
  inc.timeline.add(TimelineEntry.now("OPEN", "incident opened"));
  inc.domainEvents.add(new IncidentOpened(inc.id, fingerprint, severity.name()));
  return inc;
}

// application/usecase/IncidentAppService.java
public String ingestAlert(...) {
  Incident inc = Incident.open(fingerprint, severity, new AlertRef(alertId, summary));
  incidentRepo.save(inc);
  publisher.publishAll(inc.pullDomainEvents()); // 发布后清空
  return inc.id();
}

4. 模块与分层架构

4.1 目录结构

项目按 DDD 分层:interfaces → application → domaininfrastructure 反向依赖 application 的 SPI(接口)。

src/main/java/com/example/aiops/
├── AIOpsApplication.java
│
├── interfaces/
│   ├── IncidentController.java
│   └── dto/
│       ├── IngestAlertRequest.java
│       ├── PlanScaleOutRequest.java
│       └── ApproveActionRequest.java
│
├── application/
│   ├── usecase/
│   │   ├── IncidentAppService.java
│   │   └── RemediationAppService.java
│   ├── event/
│   │   └── ActionApprovedEvent.java
│   ├── listener/
│   │   └── ActionExecutionListener.java
│   └── spi/
│       ├── IncidentRepository.java
│       ├── ActionRepository.java
│       ├── ActionExecutor.java
│       └── DomainEventPublisher.java
│
├── domain/
│   ├── incident/
│   │   ├── Incident.java
│   │   ├── IncidentStatus.java
│   │   ├── Severity.java
│   │   ├── AlertRef.java
│   │   └── TimelineEntry.java
│   ├── remediation/
│   │   ├── RemediationAction.java
│   │   ├── ActionStatus.java
│   │   └── ActionType.java
│   └── events/
│       ├── IncidentOpened.java
│       ├── IncidentMitigationStarted.java
│       ├── ActionPlanned.java
│       ├── ActionApproved.java
│       ├── ActionSucceeded.java
│       └── ActionFailed.java
│
└── infrastructure/
    ├── repos/
    │   ├── InMemoryIncidentRepository.java
    │   └── InMemoryActionRepository.java
    ├── executor/
    │   ├── AsyncConfig.java
    │   └── SimulatedK8sExecutor.java
    └── publisher/
        └── SimpleLogEventPublisher.java

4.2 分层关系图(依赖方向)

interfaces
(Controller/DTO)

application
(usecase/listener/
event/spi)

domain
(aggregates/value objects/events)

infrastructure
(repos/executor/publisher impl)

Dependency Rule:
interfaces → application → domain
infrastructure implements application.spi


5. 存储(Repository):为什么接口在 application/spi?

存储的本质不是“数据库技术”,而是应用层对外部持久化能力的抽象依赖。因此接口放在 application(SPI),实现放在 infrastructure。

存储接口:

public interface IncidentRepository {
  Optional<Incident> findById(String incidentId);
  Optional<Incident> findActiveByFingerprint(String fingerprint);
  void save(Incident incident);
}

存储实现:

@Repository
public class InMemoryIncidentRepository implements IncidentRepository {
  private final Map<String, Incident> byId = new ConcurrentHashMap<>();
  private final Map<String, String> fingerprintToId = new ConcurrentHashMap<>();
  ...
}

6. 用例编排(Application Service)与幂等性

RemediationAppService.planScaleOut() 中,我们协调:

  • 读取 Incident
  • 让 Incident 进入处置态(领域规则)
  • 创建 Action(工厂方法)
  • 业务幂等:dedupKey 存在则复用
  • 保存并发布领域事件

典型代码结构如下(节选):

public String planScaleOut(String incidentId, String target, int replicas) {
  Incident inc = incidentRepo.findById(incidentId)
      .orElseThrow(() -> new IllegalArgumentException("incident not found"));

  inc.startMitigation("scale out inference service");

  RemediationAction action = RemediationAction.plan(
      incidentId, ActionType.SCALE_DEPLOYMENT, target,
      Map.of("replicas", String.valueOf(replicas))
  );

  var existed = actionRepo.findByDedupKey(action.dedupKey());

  incidentRepo.save(inc);
  publisher.publishAll(inc.pullDomainEvents());

  if (existed.isPresent()) return existed.get().id();

  actionRepo.save(action);
  publisher.publishAll(action.pullDomainEvents());
  return action.id();
}

这类“跨聚合编排、事务边界、幂等策略”是应用层的典型工作,领域层保持纯粹。


7. 异步执行:为什么放在 Listener?

审批接口希望快速返回,而执行扩容是慢任务,需要异步:

  • AppService:只负责“批准”这个用例(业务含义清晰)
  • Listener:负责“批准之后的反应”(异步执行、可扩展、多订阅者)
ActionExecutor ActionExecutionListener Spring Event Bus DomainEventPublisher RemediationAppService REST API ActionExecutor ActionExecutionListener Spring Event Bus DomainEventPublisher RemediationAppService REST API approve(actionId) publish(ActionApproved) 200 OK (approved) publishEvent(ActionApprovedEvent) on(ActionApprovedEvent) @Async execute(action) done/exception update Action status

这样做的优势在于:

  • 用例边界清晰(approve 就是 approve)
  • 异步可重试/可补偿
  • 新增通知、审计、指标等订阅者无需改 AppService

8. DDD 核心模式总结

模式 项目体现 位置
限界上下文 故障管理 / 修复执行 / 告警接入 模块划分
通用语言 ingestAlert / planScaleOut / approve API + 方法名
聚合/聚合根 Incident / RemediationAction domain
实体 Incident / RemediationAction domain
值对象 AlertRef / TimelineEntry domain
领域事件 IncidentOpened / ActionApproved… domain/events
存储 接口在 application/spi,实现在 infra application + infra
工厂 Incident.open() / RemediationAction.plan() domain
应用服务 用例编排、幂等策略、跨聚合协调 application/usecase
事件监听 批准后的异步执行 application/listener

9. DDD 带来的价值

  • 业务聚焦:代码直接表达运维语义(Incident、Mitigation、Action)
  • 可维护性:状态流转与规则集中在领域模型里
  • 可测试性:领域逻辑不依赖框架,可纯 Java 单测
  • 可扩展性:新增修复类型只扩展 ActionType 与执行器
  • 团队协作:运维/开发围绕同一语言沟通
  • 应对变化:存储、消息、执行器替换不影响核心业务模型

10. 总结

DDD 的本质不是“把包名改成 domain/application/infrastructure”,而是 把复杂业务领域的知识映射到代码结构中,通过清晰的领域模型,让代码成为业务规则的直接表达。

在 AIOps 这种“业务语义极重、状态流转复杂、异步编排普遍”的系统里,DDD 往往能带来非常显著的收益。


【附】项目源码(项目目录已在正文中展示)

package per.mjn.aiops.application.event;
public record ActionApprovedEvent(String actionId) {}
package per.mjn.aiops.application.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;
import per.mjn.aiops.application.event.ActionApprovedEvent;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionExecutor;
import per.mjn.aiops.application.spi.ActionRepository;

@Component
public class ActionExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(ActionExecutionListener.class);

    private final ActionRepository actionRepo;
    private final ActionExecutor executor;

    public ActionExecutionListener(ActionRepository actionRepo, ActionExecutor executor) {
        this.actionRepo = actionRepo;
        this.executor = executor;
    }

    /**
     * 关键点:
     * - TransactionalEventListener:确保 approve 的事务提交后才触发
     * - Async:异步线程执行,不阻塞 HTTP 请求
     */
    @Async
    @TransactionalEventListener
    public void on(ActionApprovedEvent event) {
        String actionId = event.actionId();
        log.info("[ASYNC] receive ActionApprovedEvent: actionId={}", actionId);

        RemediationAction act = actionRepo.findById(actionId)
                .orElseThrow(() -> new IllegalStateException("action not found: " + actionId));

        // 幂等/重复投递保护:只有 APPROVED 才能 start
        try {
            act.start();
            actionRepo.save(act);

            executor.execute(act);

            act.succeed();
            actionRepo.save(act);

            log.info("[ASYNC] action succeeded: actionId={}", actionId);
        } catch (Exception ex) {
            log.warn("[ASYNC] execution error: actionId={}, reason={}", actionId, ex.getMessage());

            // 如果 start() 就失败,说明已执行过/状态不对;这里尽量落失败(若已进入 RUNNING)
            try {
                act.fail(ex.getMessage());
                actionRepo.save(act);
            } catch (Exception ignore) {
                // ignore: status not RUNNING, or already terminal
            }
        }
    }
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.remediation.RemediationAction;

public interface ActionExecutor {
    void execute(RemediationAction action);
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.remediation.RemediationAction;
import java.util.Optional;

public interface ActionRepository {
    Optional<RemediationAction> findById(String actionId);
    Optional<RemediationAction> findByDedupKey(String dedupKey);
    void save(RemediationAction action);
}
package per.mjn.aiops.application.spi;
import java.util.List;

public interface DomainEventPublisher {
    void publishAll(List<Object> events);
}
package per.mjn.aiops.application.spi;
import per.mjn.aiops.domain.incident.Incident;
import java.util.Optional;

public interface IncidentRepository {
    Optional<Incident> findById(String incidentId);
    Optional<Incident> findActiveByFingerprint(String fingerprint);
    void save(Incident incident);
}
package per.mjn.aiops.application.usecase;

import org.springframework.stereotype.Service;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.incident.AlertRef;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.incident.Severity;
import per.mjn.aiops.application.spi.IncidentRepository;

@Service
public class IncidentAppService {

    private final IncidentRepository incidentRepo;
    private final DomainEventPublisher publisher;

    public IncidentAppService(IncidentRepository incidentRepo, DomainEventPublisher publisher) {
        this.incidentRepo = incidentRepo;
        this.publisher = publisher;
    }

    public String ingestAlert(String alertId, String cluster, String service, String symptom, String summary, Severity sev) {
        String fingerprint = cluster + "|" + service + "|" + symptom;

        Incident inc = incidentRepo.findActiveByFingerprint(fingerprint)
                .map(existing -> {
                    existing.addAlert(new AlertRef(alertId, summary));
                    return existing;
                })
                .orElseGet(() -> Incident.open(fingerprint, sev, new AlertRef(alertId, summary)));

        incidentRepo.save(inc);
        publisher.publishAll(inc.pullDomainEvents());
        return inc.id();
    }
}
package per.mjn.aiops.application.usecase;

import org.springframework.stereotype.Service;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.remediation.ActionType;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.application.spi.IncidentRepository;

import java.util.Map;

@Service
public class RemediationAppService {

    private final IncidentRepository incidentRepo;
    private final ActionRepository actionRepo;
    private final DomainEventPublisher publisher;

    public RemediationAppService(IncidentRepository incidentRepo,
                                 ActionRepository actionRepo,
                                 DomainEventPublisher publisher) {
        this.incidentRepo = incidentRepo;
        this.actionRepo = actionRepo;
        this.publisher = publisher;
    }

    public String planScaleOut(String incidentId, String target, int replicas) {
        Incident inc = incidentRepo.findById(incidentId)
                .orElseThrow(() -> new IllegalArgumentException("incident not found"));

        // 领域规则:进入处置态
        inc.startMitigation("scale out inference service");

        RemediationAction action = RemediationAction.plan(
                incidentId,
                ActionType.SCALE_DEPLOYMENT,
                target,
                Map.of("replicas", String.valueOf(replicas))
        );

        // 幂等:dedupKey 存在就复用
        var existed = actionRepo.findByDedupKey(action.dedupKey());

        // 先保存 incident 及其事件
        incidentRepo.save(inc);
        publisher.publishAll(inc.pullDomainEvents());

        if (existed.isPresent()) {
            return existed.get().id();
        }

        actionRepo.save(action);
        publisher.publishAll(action.pullDomainEvents());
        return action.id();
    }

    /**
     * 只审批并发布事件(异步执行由监听器完成)
     */
    public RemediationAction approve(String actionId, String approver) {
        RemediationAction act = actionRepo.findById(actionId)
                .orElseThrow(() -> new IllegalArgumentException("action not found"));

        act.approve(approver);
        actionRepo.save(act);

        // 这里会发布 ActionApproved 领域事件,进而触发 ActionApprovedEvent -> 异步执行
        publisher.publishAll(act.pullDomainEvents());
        return act;
    }
}
package per.mjn.aiops.domain.events;

public record ActionApproved(String actionId, String approver) {}
package per.mjn.aiops.domain.events;

public record ActionFailed(String actionId, String reason) {}
package per.mjn.aiops.domain.events;

public record ActionPlanned(String actionId, String incidentId, String actionType, String target) {}
package per.mjn.aiops.domain.events;

public record ActionSucceeded(String actionId) {}
package per.mjn.aiops.domain.events;

public record IncidentMitigationStarted(String incidentId, String severity) {}
package per.mjn.aiops.domain.events;

public record IncidentOpened(String incidentId, String fingerprint, String severity) {}
package per.mjn.aiops.domain.incident;

public record AlertRef(String alertId, String summary) {}
package per.mjn.aiops.domain.incident;

import per.mjn.aiops.domain.events.IncidentMitigationStarted;
import per.mjn.aiops.domain.events.IncidentOpened;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class Incident {

    private final String id;
    private final String fingerprint;
    private final Severity severity;

    private IncidentStatus status;

    private final List<AlertRef> alerts = new ArrayList<>();
    private final List<TimelineEntry> timeline = new ArrayList<>();
    private final List<Object> domainEvents = new ArrayList<>();

    private Incident(String id, String fingerprint, Severity severity) {
        this.id = id;
        this.fingerprint = fingerprint;
        this.severity = severity;
        this.status = IncidentStatus.OPEN;
    }

    public static Incident open(String fingerprint, Severity severity, AlertRef firstAlert) {
        Incident inc = new Incident(UUID.randomUUID().toString(), fingerprint, severity);
        inc.alerts.add(firstAlert);
        inc.timeline.add(TimelineEntry.now("OPEN", "incident opened"));
        inc.domainEvents.add(new IncidentOpened(inc.id, fingerprint, severity.name()));
        return inc;
    }

    public void addAlert(AlertRef alert) {
        ensureNotTerminal();
        alerts.add(alert);
        timeline.add(TimelineEntry.now("ALERT_ADDED", alert.summary()));
    }

    public void startMitigation(String reason) {
        ensureNotTerminal();
        if (!(status == IncidentStatus.OPEN || status == IncidentStatus.TRIAGING)) {
            throw new IllegalStateException("cannot start mitigation from status=" + status);
        }
        status = IncidentStatus.MITIGATING;
        timeline.add(TimelineEntry.now("MITIGATING", reason));
        domainEvents.add(new IncidentMitigationStarted(id, severity.name()));
    }

    private void ensureNotTerminal() {
        if (status == IncidentStatus.CLOSED || status == IncidentStatus.CANCELED) {
            throw new IllegalStateException("terminal status=" + status);
        }
    }

    public List<Object> pullDomainEvents() {
        List<Object> copy = List.copyOf(domainEvents);
        domainEvents.clear();
        return copy;
    }

    public String id() { return id; }
    public String fingerprint() { return fingerprint; }
    public Severity severity() { return severity; }
    public IncidentStatus status() { return status; }
    public List<AlertRef> alerts() { return List.copyOf(alerts); }
    public List<TimelineEntry> timeline() { return List.copyOf(timeline); }
}
package per.mjn.aiops.domain.incident;

public enum IncidentStatus {
    OPEN, TRIAGING, MITIGATING, RESOLVED, CLOSED, CANCELED
}
package per.mjn.aiops.domain.incident;

public enum Severity {
    SEV1, SEV2, SEV3, SEV4
}
package per.mjn.aiops.domain.incident;

import java.time.Instant;

public record TimelineEntry(Instant at, String kind, String message) {
    public static TimelineEntry now(String kind, String message) {
        return new TimelineEntry(Instant.now(), kind, message);
    }
}
package per.mjn.aiops.domain.remediation;

public enum ActionStatus {
    PENDING, APPROVED, RUNNING, SUCCEEDED, FAILED
}
package per.mjn.aiops.domain.remediation;

public enum ActionType {
    SCALE_DEPLOYMENT
}
package per.mjn.aiops.domain.remediation;

import per.mjn.aiops.domain.events.*;

import java.util.*;

public class RemediationAction {

    private final String id;
    private final String incidentId;
    private final String dedupKey;

    private final ActionType type;
    private final String target;
    private final Map<String, String> params;

    private ActionStatus status;

    private final List<Object> domainEvents = new ArrayList<>();

    private RemediationAction(String id, String incidentId, String dedupKey,
                              ActionType type, String target, Map<String, String> params) {
        this.id = id;
        this.incidentId = incidentId;
        this.dedupKey = dedupKey;
        this.type = type;
        this.target = target;
        this.params = new HashMap<>(params);
        this.status = ActionStatus.PENDING;
    }

    public static RemediationAction plan(String incidentId, ActionType type, String target, Map<String, String> params) {
        String paramsKey = params.entrySet().stream()
                .sorted(Map.Entry.comparingByKey())
                .map(e -> e.getKey() + "=" + e.getValue())
                .reduce((a, b) -> a + "|" + b)
                .orElse("");

        String dedupKey = incidentId + ":" + type.name() + ":" + target + ":" + paramsKey;

        RemediationAction act = new RemediationAction(UUID.randomUUID().toString(), incidentId, dedupKey, type, target, params);
        act.domainEvents.add(new ActionPlanned(act.id, incidentId, type.name(), target));
        return act;
    }

    public void approve(String approver) {
        if (status != ActionStatus.PENDING) throw new IllegalStateException("must be PENDING to approve, now=" + status);
        status = ActionStatus.APPROVED;
        domainEvents.add(new ActionApproved(id, approver));
    }

    public void start() {
        if (status != ActionStatus.APPROVED) throw new IllegalStateException("must be APPROVED to start, now=" + status);
        status = ActionStatus.RUNNING;
    }

    public void succeed() {
        if (status != ActionStatus.RUNNING) throw new IllegalStateException("must be RUNNING to succeed, now=" + status);
        status = ActionStatus.SUCCEEDED;
        domainEvents.add(new ActionSucceeded(id));
    }

    public void fail(String reason) {
        if (status != ActionStatus.RUNNING) throw new IllegalStateException("must be RUNNING to fail, now=" + status);
        status = ActionStatus.FAILED;
        domainEvents.add(new ActionFailed(id, reason));
    }

    public List<Object> pullDomainEvents() {
        List<Object> copy = List.copyOf(domainEvents);
        domainEvents.clear();
        return copy;
    }

    public String id() { return id; }
    public String incidentId() { return incidentId; }
    public String dedupKey() { return dedupKey; }
    public ActionType type() { return type; }
    public String target() { return target; }
    public Map<String, String> params() { return Map.copyOf(params); }
    public ActionStatus status() { return status; }
}
package per.mjn.aiops.infrastructure.executor;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class AsyncConfig {
}
package per.mjn.aiops.infrastructure.executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import per.mjn.aiops.application.spi.ActionExecutor;
import per.mjn.aiops.domain.remediation.ActionType;
import per.mjn.aiops.domain.remediation.RemediationAction;

@Component
public class SimulatedK8sExecutor implements ActionExecutor {

    private static final Logger log = LoggerFactory.getLogger(SimulatedK8sExecutor.class);

    @Override
    public void execute(RemediationAction action) {
        if (action.type() == ActionType.SCALE_DEPLOYMENT) {
            int replicas = Integer.parseInt(action.params().get("replicas"));
            log.info("[K8S] scale deployment => target={}, replicas={}", action.target(), replicas);
            if (replicas <= 0) {
                throw new IllegalArgumentException("replicas must be > 0");
            }
            return;
        }
        throw new IllegalStateException("unsupported action type=" + action.type());
    }
}
package per.mjn.aiops.infrastructure.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import per.mjn.aiops.application.event.ActionApprovedEvent;
import per.mjn.aiops.application.spi.DomainEventPublisher;
import per.mjn.aiops.domain.events.ActionApproved;

import java.util.List;

@Component
public class SimpleLogEventPublisher implements DomainEventPublisher {

    private static final Logger log = LoggerFactory.getLogger(SimpleLogEventPublisher.class);

    private final ApplicationEventPublisher springPublisher;

    public SimpleLogEventPublisher(ApplicationEventPublisher springPublisher) {
        this.springPublisher = springPublisher;
    }

    @Override
    public void publishAll(List<Object> events) {
        for (Object e : events) {
            log.info("[DOMAIN_EVENT] {}", e);

            // 领域事件 -> 应用事件(触发异步执行)
            if (e instanceof ActionApproved aa) {
                springPublisher.publishEvent(new ActionApprovedEvent(aa.actionId()));
            }
        }
    }
}
package per.mjn.aiops.infrastructure.repos;

import org.springframework.stereotype.Repository;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.domain.remediation.RemediationAction;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

@Repository
public class InMemoryActionRepository implements ActionRepository {

    private final Map<String, RemediationAction> byId = new ConcurrentHashMap<>();
    private final Map<String, String> byDedup = new ConcurrentHashMap<>();

    @Override
    public Optional<RemediationAction> findById(String actionId) {
        return Optional.ofNullable(byId.get(actionId));
    }

    @Override
    public Optional<RemediationAction> findByDedupKey(String dedupKey) {
        String id = byDedup.get(dedupKey);
        return id == null ? Optional.empty() : Optional.ofNullable(byId.get(id));
    }

    @Override
    public void save(RemediationAction action) {
        byId.put(action.id(), action);
        byDedup.put(action.dedupKey(), action.id());
    }
}
package per.mjn.aiops.infrastructure.repos;

import org.springframework.stereotype.Repository;
import per.mjn.aiops.application.spi.IncidentRepository;
import per.mjn.aiops.domain.incident.Incident;
import per.mjn.aiops.domain.incident.IncidentStatus;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

@Repository
public class InMemoryIncidentRepository implements IncidentRepository {

    private final Map<String, Incident> byId = new ConcurrentHashMap<>();

    @Override
    public Optional<Incident> findById(String incidentId) {
        return Optional.ofNullable(byId.get(incidentId));
    }

    @Override
    public Optional<Incident> findActiveByFingerprint(String fingerprint) {
        return byId.values().stream()
                .filter(i -> i.fingerprint().equals(fingerprint))
                .filter(i -> i.status() != IncidentStatus.CLOSED && i.status() != IncidentStatus.CANCELED)
                .findFirst();
    }

    @Override
    public void save(Incident incident) {
        byId.put(incident.id(), incident);
    }
}
package per.mjn.aiops.interfaces.dto;

import jakarta.validation.constraints.NotBlank;

public class ApproveActionRequest {
    @NotBlank public String approver;
}
package per.mjn.aiops.interfaces.dto;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import per.mjn.aiops.domain.incident.Severity;

public class IngestAlertRequest {
    @NotBlank public String alertId;
    @NotBlank public String cluster;
    @NotBlank public String service;
    @NotBlank public String symptom;
    @NotBlank public String summary;
    @NotNull public Severity severity;
}
package per.mjn.aiops.interfaces.dto;

import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;

public class PlanScaleOutRequest {
    @NotBlank public String target;
    @Min(1) public int replicas;
}
package per.mjn.aiops.interfaces;

import jakarta.validation.Valid;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import per.mjn.aiops.application.usecase.IncidentAppService;
import per.mjn.aiops.application.usecase.RemediationAppService;
import per.mjn.aiops.domain.remediation.RemediationAction;
import per.mjn.aiops.application.spi.ActionRepository;
import per.mjn.aiops.application.spi.IncidentRepository;
import per.mjn.aiops.interfaces.dto.ApproveActionRequest;
import per.mjn.aiops.interfaces.dto.IngestAlertRequest;

import per.mjn.aiops.interfaces.dto.PlanScaleOutRequest;

import java.util.Map;

@RestController
public class IncidentController {

    private final IncidentAppService incidentService;
    private final RemediationAppService remediationService;
    private final IncidentRepository incidentRepo;
    private final ActionRepository actionRepo;

    public IncidentController(IncidentAppService incidentService,
                              RemediationAppService remediationService,
                              IncidentRepository incidentRepo,
                              ActionRepository actionRepo) {
        this.incidentService = incidentService;
        this.remediationService = remediationService;
        this.incidentRepo = incidentRepo;
        this.actionRepo = actionRepo;
    }

    @PostMapping("/alerts/ingest")
    public ResponseEntity<?> ingest(@Valid @RequestBody IngestAlertRequest req) {
        String incidentId = incidentService.ingestAlert(
                req.alertId, req.cluster, req.service, req.symptom, req.summary, req.severity
        );
        return ResponseEntity.ok(Map.of("incidentId", incidentId));
    }

    @PostMapping("/incidents/{incidentId}/plan/scale-out")
    public ResponseEntity<?> planScaleOut(@PathVariable("incidentId") String incidentId, @Valid @RequestBody PlanScaleOutRequest req) {
        try {
            String actionId = remediationService.planScaleOut(incidentId, req.target, req.replicas);
            return ResponseEntity.ok(Map.of("actionId", actionId));
        } catch (IllegalArgumentException e) {
            return ResponseEntity.status(404).body(Map.of("error", e.getMessage()));
        } catch (IllegalStateException e) {
            return ResponseEntity.status(409).body(Map.of("error", e.getMessage()));
        }
    }

    /**
     * 审批后立即返回,执行异步进行
     */
    @PostMapping("/actions/{actionId}/approve")
    public ResponseEntity<?> approve(@PathVariable("actionId") String actionId,
                                     @Valid @RequestBody ApproveActionRequest req) {
        try {
            RemediationAction act = remediationService.approve(actionId, req.approver);
            return ResponseEntity.ok(Map.of(
                    "actionId", act.id(),
                    "status", act.status().name(),
                    "message", "approved; execution will run asynchronously"
            ));
        } catch (IllegalArgumentException e) {
            return ResponseEntity.status(404).body(Map.of("error", e.getMessage()));
        } catch (IllegalStateException e) {
            return ResponseEntity.status(409).body(Map.of("error", e.getMessage()));
        }
    }

    @GetMapping("/incidents/{incidentId}")
    public ResponseEntity<?> getIncident(@PathVariable("incidentId") String incidentId) {
        return incidentRepo.findById(incidentId)
                .map(i -> ResponseEntity.ok(Map.of(
                        "incidentId", i.id(),
                        "fingerprint", i.fingerprint(),
                        "severity", i.severity().name(),
                        "status", i.status().name(),
                        "alerts", i.alerts(),
                        "timeline", i.timeline()
                )))
                .orElseGet(() -> ResponseEntity.status(404).body(Map.of("error", "incident not found")));
    }

    @GetMapping("/actions/{actionId}")
    public ResponseEntity<?> getAction(@PathVariable("actionId") String actionId) {
        return actionRepo.findById(actionId)
                .map(a -> ResponseEntity.ok(Map.of(
                        "actionId", a.id(),
                        "incidentId", a.incidentId(),
                        "status", a.status().name(),
                        "type", a.type().name(),
                        "target", a.target(),
                        "params", a.params()
                )))
                .orElseGet(() -> ResponseEntity.status(404).body(Map.of("error", "action not found")));
    }
}
package per.mjn.aiops;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AIOpsApplication {
    public static void main(String[] args) {
        SpringApplication.run(AIOpsApplication.class, args);
    }
}
<project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>per.mjn</groupId>
    <artifactId>ddd-aiops-min</artifactId>
    <version>1.0.0</version>

    <properties>
        <spring-boot.version>3.3.2</spring-boot.version>
        <java.version>21</java.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>21</source>
                    <target>21</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

系统测试

(1)告警接入:POST /alerts/ingest

curl -X POST http://localhost:8080/alerts/ingest \
  -H "Content-Type: application/json" \
  -d '{
    "alertId": "a-001",
    "cluster": "prod-1",
    "service": "infer-qwen",
    "symptom": "p95_latency_spike",
    "summary": "p95 > 2s for 5m",
    "severity": "SEV2"
  }'

预期返回:

{
	"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f"
}

若再 ingest 一次同 fingerprint(cluster+service+symptom)会归并到同一个 incident(返回相同或新的取决于 repo 里是否已经有 active)。


(2)规划扩容:POST /incidents/{incidentId}/plan/scale-out

curl -X POST http://localhost:8080/incidents/<incidentId>/plan/scale-out \
  -H "Content-Type: application/json" \
  -d '{ "target":"ns=llm,deploy=infer-qwen", "replicas": 6 }'

预期返回:

{
	"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf"
}

(3)审批:POST /actions/{actionId}/approve(异步执行开始)

curl -X POST http://localhost:8080/actions/<actionId>/approve \
  -H "Content-Type: application/json" \
  -d '{ "approver":"oncall-1" }'

预期返回:

{
	"status": "APPROVED",
	"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf",
	"message": "approved; execution will run asynchronously"
}

(4)查询 Action:GET /actions/{actionId}(轮询看状态变化)

curl http://localhost:8080/actions/<actionId>

预期返回:

{
	"target": "ns=llm,deploy=infer-qwen",
	"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f",
	"params": {
		"replicas": "6"
	},
	"actionId": "10de4fdb-a625-46e8-a626-8ac0a760afbf",
	"type": "SCALE_DEPLOYMENT",
	"status": "SUCCEEDED"
}

(5)查询 Incident:GET /incidents/{incidentId}

curl http://localhost:8080/incidents/<incidentId>

预期返回:

{
	"severity": "SEV2",
	"incidentId": "d9eb3657-5130-4e09-8b88-4bdd12c2f14f",
	"alerts": [
		{
			"alertId": "a-001",
			"summary": "p95 > 2s for 5m"
		}
	],
	"fingerprint": "prod-1|infer-qwen|p95_latency_spike",
	"status": "MITIGATING",
	"timeline": [
		{
			"at": "2026-01-22T13:50:39.371743600Z",
			"kind": "OPEN",
			"message": "incident opened"
		},
		{
			"at": "2026-01-22T13:51:24.512972900Z",
			"kind": "MITIGATING",
			"message": "scale out inference service"
		}
	]
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐