观察者模式深度解析:事件驱动的艺术
观察者模式是一种行为型设计模式,通过定义一对多的依赖关系实现对象间的动态通知机制。其核心思想是解耦观察者与被观察者,当被观察者状态变化时自动通知所有依赖对象。模式包含主题(维护观察者列表)、具体主题(实现业务状态)、观察者接口和具体观察者四个角色。示例展示了新闻发布系统实现:NewsPublisher作为具体主题,管理EmailSubscriber和PushSubscriber等具体观察者,当发布
·
🎯 模式定义与核心思想
观察者模式(Observer Pattern)是一种行为型设计模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
核心哲学
“解耦观察者与被观察者,实现松耦合的事件通知机制”
🏗️ 基础架构与角色分析
模式结构组成
| 角色 | 职责 | 示例 |
|---|---|---|
| 主题/被观察者 | 维护观察者列表,提供注册/注销方法 | Subject |
| 具体主题 | 实现主题接口,维护具体状态 | ConcreteSubject |
| 观察者 | 定义更新接口 | Observer |
| 具体观察者 | 实现更新接口,保持与主题状态一致 | ConcreteObserver |
💻 完整代码示例:新闻发布系统
基础观察者模式实现
// 观察者接口
public interface Observer {
void update(String message);
String getObserverName();
}
// 主题/被观察者接口
public interface Subject {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers();
void setNews(String news);
String getNews();
}
// 具体主题 - 新闻发布器
public class NewsPublisher implements Subject {
private List<Observer> observers;
private String latestNews;
private String publisherName;
public NewsPublisher(String publisherName) {
this.observers = new ArrayList<>();
this.latestNews = "暂无新闻";
this.publisherName = publisherName;
System.out.println("📰 创建新闻发布器: " + publisherName);
}
@Override
public void registerObserver(Observer observer) {
if (!observers.contains(observer)) {
observers.add(observer);
System.out.println("✅ " + observer.getObserverName() + " 订阅了 " + publisherName);
}
}
@Override
public void removeObserver(Observer observer) {
if (observers.remove(observer)) {
System.out.println("❌ " + observer.getObserverName() + " 取消订阅 " + publisherName);
}
}
@Override
public void notifyObservers() {
if (observers.isEmpty()) {
System.out.println("ℹ️ 暂无订阅者,无需通知");
return;
}
System.out.println("🔔 " + publisherName + " 开始通知 " + observers.size() + " 个订阅者");
for (Observer observer : observers) {
observer.update(latestNews);
}
}
@Override
public void setNews(String news) {
this.latestNews = news;
System.out.println("📢 " + publisherName + " 发布新闻: " + news);
notifyObservers();
}
@Override
public String getNews() {
return latestNews;
}
// 获取订阅者信息
public List<String> getSubscriberNames() {
return observers.stream()
.map(Observer::getObserverName)
.collect(Collectors.toList());
}
public int getSubscriberCount() {
return observers.size();
}
public String getPublisherName() {
return publisherName;
}
}
// 具体观察者 - 电子邮件订阅者
public class EmailSubscriber implements Observer {
private String email;
private String name;
private List<String> receivedNews;
public EmailSubscriber(String name, String email) {
this.name = name;
this.email = email;
this.receivedNews = new ArrayList<>();
System.out.println("📧 创建邮件订阅者: " + name + " (" + email + ")");
}
@Override
public void update(String message) {
receivedNews.add(message);
System.out.println("📨 " + name + " 收到邮件通知: " + message);
// 模拟发送邮件
simulateEmailSending(message);
}
@Override
public String getObserverName() {
return name + " (邮件)";
}
private void simulateEmailSending(String message) {
try {
Thread.sleep(100); // 模拟邮件发送延迟
System.out.println("✅ 邮件已发送到: " + email);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("❌ 邮件发送中断");
}
}
public List<String> getReceivedNews() {
return new ArrayList<>(receivedNews);
}
public int getReceivedCount() {
return receivedNews.size();
}
}
// 具体观察者 - 手机推送订阅者
public class PushSubscriber implements Observer {
private String deviceId;
private String username;
private List<String> receivedNotifications;
public PushSubscriber(String username, String deviceId) {
this.username = username;
this.deviceId = deviceId;
this.receivedNotifications = new ArrayList<>();
System.out.println("📱 创建推送订阅者: " + username + " (" + deviceId + ")");
}
@Override
public void update(String message) {
receivedNotifications.add(message);
System.out.println("🔔 " + username + " 收到推送通知: " + message);
// 模拟推送发送
simulatePushSending(message);
}
@Override
public String getObserverName() {
return username + " (推送)";
}
private void simulatePushSending(String message) {
try {
Thread.sleep(50); // 模拟推送发送延迟
System.out.println("✅ 推送已发送到设备: " + deviceId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("❌ 推送发送中断");
}
}
public List<String> getReceivedNotifications() {
return new ArrayList<>(receivedNotifications);
}
}
// 具体观察者 - 网站订阅者
public class WebSubscriber implements Observer {
private String websiteUrl;
private String siteName;
private List<String> displayedNews;
public WebSubscriber(String siteName, String websiteUrl) {
this.siteName = siteName;
this.websiteUrl = websiteUrl;
this.displayedNews = new ArrayList<>();
System.out.println("🌐 创建网站订阅者: " + siteName + " (" + websiteUrl + ")");
}
@Override
public void update(String message) {
displayedNews.add(message);
System.out.println("🖥️ " + siteName + " 网站更新新闻: " + message);
// 模拟网站更新
simulateWebsiteUpdate(message);
}
@Override
public String getObserverName() {
return siteName + " (网站)";
}
private void simulateWebsiteUpdate(String message) {
try {
Thread.sleep(80); // 模拟网站更新延迟
System.out.println("✅ 网站 " + siteName + " 已更新新闻内容");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("❌ 网站更新中断");
}
}
public List<String> getDisplayedNews() {
return new ArrayList<>(displayedNews);
}
}
// 具体观察者 - 短信订阅者
public class SmsSubscriber implements Observer {
private String phoneNumber;
private String customerName;
private List<String> receivedSms;
public SmsSubscriber(String customerName, String phoneNumber) {
this.customerName = customerName;
this.phoneNumber = phoneNumber;
this.receivedSms = new ArrayList<>();
System.out.println("📞 创建短信订阅者: " + customerName + " (" + phoneNumber + ")");
}
@Override
public void update(String message) {
receivedSms.add(message);
System.out.println("💬 " + customerName + " 收到短信: " + message);
// 模拟短信发送
simulateSmsSending(message);
}
@Override
public String getObserverName() {
return customerName + " (短信)";
}
private void simulateSmsSending(String message) {
try {
Thread.sleep(120); // 模拟短信发送延迟
System.out.println("✅ 短信已发送到: " + phoneNumber);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("❌ 短信发送中断");
}
}
public List<String> getReceivedSms() {
return new ArrayList<>(receivedSms);
}
}
// 新闻发布系统管理器
public class NewsSystemManager {
private Map<String, NewsPublisher> publishers;
public NewsSystemManager() {
this.publishers = new HashMap<>();
}
public NewsPublisher createPublisher(String name) {
NewsPublisher publisher = new NewsPublisher(name);
publishers.put(name, publisher);
return publisher;
}
public void publishNewsToAll(String news) {
System.out.println("\n=== 全局新闻发布 ===");
for (NewsPublisher publisher : publishers.values()) {
publisher.setNews(news);
}
}
public void printSystemStatus() {
System.out.println("\n=== 新闻系统状态 ===");
System.out.println("发布器数量: " + publishers.size());
for (NewsPublisher publisher : publishers.values()) {
System.out.println("📰 " + publisher.getPublisherName() +
" - 订阅者: " + publisher.getSubscriberCount() +
" - 最新新闻: " + publisher.getNews());
}
}
public List<String> getAllPublisherNames() {
return new ArrayList<>(publishers.keySet());
}
}
// 客户端演示
public class ObserverPatternDemo {
public static void main(String[] args) {
System.out.println("=== 观察者模式演示:新闻发布系统 ===\n");
NewsSystemManager systemManager = new NewsSystemManager();
// 1. 创建新闻发布器
System.out.println("1. 创建新闻发布器:");
NewsPublisher techPublisher = systemManager.createPublisher("科技新闻");
NewsPublisher sportsPublisher = systemManager.createPublisher("体育新闻");
NewsPublisher financePublisher = systemManager.createPublisher("财经新闻");
// 2. 创建订阅者
System.out.println("\n2. 创建订阅者:");
Observer emailSubscriber1 = new EmailSubscriber("张三", "zhangsan@email.com");
Observer emailSubscriber2 = new EmailSubscriber("李四", "lisi@email.com");
Observer pushSubscriber1 = new PushSubscriber("王五", "device_001");
Observer webSubscriber1 = new WebSubscriber("新闻门户", "https://news.example.com");
Observer smsSubscriber1 = new SmsSubscriber("赵六", "13800138000");
// 3. 订阅新闻
System.out.println("\n3. 订阅新闻:");
techPublisher.registerObserver(emailSubscriber1);
techPublisher.registerObserver(pushSubscriber1);
techPublisher.registerObserver(webSubscriber1);
sportsPublisher.registerObserver(emailSubscriber2);
sportsPublisher.registerObserver(smsSubscriber1);
financePublisher.registerObserver(emailSubscriber1);
financePublisher.registerObserver(emailSubscriber2);
financePublisher.registerObserver(pushSubscriber1);
financePublisher.registerObserver(webSubscriber1);
financePublisher.registerObserver(smsSubscriber1);
// 4. 发布新闻
System.out.println("\n4. 发布新闻:");
techPublisher.setNews("人工智能取得重大突破!");
sportsPublisher.setNews("中国女排夺得世界冠军!");
financePublisher.setNews("股市今日大涨3.5%!");
// 5. 取消订阅演示
System.out.println("\n5. 取消订阅演示:");
techPublisher.removeObserver(emailSubscriber1);
techPublisher.setNews("新的科技产品发布会即将举行");
// 6. 系统状态
System.out.println("\n6. 系统状态:");
systemManager.printSystemStatus();
// 7. 全局新闻发布
System.out.println("\n7. 全局新闻发布:");
systemManager.publishNewsToAll("紧急通知:系统维护将于今晚进行");
// 8. 性能测试
System.out.println("\n8. 性能测试:");
performanceTest();
}
private static void performanceTest() {
NewsPublisher testPublisher = new NewsPublisher("测试发布器");
// 创建大量观察者
List<Observer> testObservers = new ArrayList<>();
for (int i = 0; i < 100; i++) {
testObservers.add(new EmailSubscriber("测试用户" + i, "test" + i + "@email.com"));
}
// 注册所有观察者
for (Observer observer : testObservers) {
testPublisher.registerObserver(observer);
}
// 测试通知性能
long startTime = System.currentTimeMillis();
testPublisher.setNews("性能测试新闻");
long endTime = System.currentTimeMillis();
System.out.println("通知 " + testObservers.size() + " 个观察者耗时: " + (endTime - startTime) + "ms");
}
}
🏢 企业级应用:股票交易系统
复杂的观察者模式实现
// 股票数据类
public class StockData {
private final String symbol;
private double price;
private double change;
private double changePercent;
private long volume;
private LocalDateTime timestamp;
public StockData(String symbol, double price, double change, double changePercent, long volume) {
this.symbol = symbol;
this.price = price;
this.change = change;
this.changePercent = changePercent;
this.volume = volume;
this.timestamp = LocalDateTime.now();
}
// Getters
public String getSymbol() { return symbol; }
public double getPrice() { return price; }
public double getChange() { return change; }
public double getChangePercent() { return changePercent; }
public long getVolume() { return volume; }
public LocalDateTime getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("%s: $%.2f (%.2f, %.2f%%) Vol: %,d",
symbol, price, change, changePercent, volume);
}
}
// 股票市场主题
public interface StockMarket {
void registerObserver(StockObserver observer);
void removeObserver(StockObserver observer);
void notifyObservers(StockData stockData);
void addStockData(StockData stockData);
List<String> getSupportedSymbols();
}
// 股票观察者接口
public interface StockObserver {
void update(StockData stockData);
String getObserverId();
Set<String> getInterestedSymbols();
default boolean isInterestedIn(String symbol) {
return getInterestedSymbols().contains(symbol);
}
}
// 具体股票市场 - 实时数据源
public class RealTimeStockMarket implements StockMarket {
private final List<StockObserver> observers;
private final Map<String, StockData> latestData;
private final String marketName;
public RealTimeStockMarket(String marketName) {
this.observers = new ArrayList<>();
this.latestData = new ConcurrentHashMap<>();
this.marketName = marketName;
System.out.println("🏛️ 创建股票市场: " + marketName);
}
@Override
public void registerObserver(StockObserver observer) {
if (!observers.contains(observer)) {
observers.add(observer);
System.out.println("✅ " + observer.getObserverId() + " 注册到 " + marketName);
}
}
@Override
public void removeObserver(StockObserver observer) {
if (observers.remove(observer)) {
System.out.println("❌ " + observer.getObserverId() + " 从 " + marketName + " 取消注册");
}
}
@Override
public void notifyObservers(StockData stockData) {
if (observers.isEmpty()) {
return;
}
List<StockObserver> interestedObservers = observers.stream()
.filter(observer -> observer.isInterestedIn(stockData.getSymbol()))
.collect(Collectors.toList());
if (interestedObservers.isEmpty()) {
return;
}
System.out.println("🔔 " + marketName + " 通知 " + interestedObservers.size() +
" 个观察者关于 " + stockData.getSymbol());
for (StockObserver observer : interestedObservers) {
observer.update(stockData);
}
}
@Override
public void addStockData(StockData stockData) {
latestData.put(stockData.getSymbol(), stockData);
System.out.println("📊 " + marketName + " 更新: " + stockData);
notifyObservers(stockData);
}
@Override
public List<String> getSupportedSymbols() {
return new ArrayList<>(latestData.keySet());
}
public void simulateMarketData() {
Random random = new Random();
// 模拟一些热门股票
String[] symbols = {"AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"};
for (String symbol : symbols) {
double basePrice = 100 + random.nextDouble() * 900;
double change = (random.nextDouble() - 0.5) * 10;
double changePercent = (change / basePrice) * 100;
long volume = 1000000 + random.nextInt(9000000);
StockData data = new StockData(symbol, basePrice, change, changePercent, volume);
addStockData(data);
// 模拟数据更新间隔
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public StockData getLatestData(String symbol) {
return latestData.get(symbol);
}
public int getObserverCount() {
return observers.size();
}
}
// 具体观察者 - 交易机器人
public class TradingBot implements StockObserver {
private final String botId;
private final Set<String> interestedSymbols;
private final Map<String, List<StockData>> historicalData;
private final double buyThreshold;
private final double sellThreshold;
public TradingBot(String botId, Set<String> symbols, double buyThreshold, double sellThreshold) {
this.botId = botId;
this.interestedSymbols = new HashSet<>(symbols);
this.historicalData = new HashMap<>();
this.buyThreshold = buyThreshold;
this.sellThreshold = sellThreshold;
System.out.println("🤖 创建交易机器人: " + botId);
}
@Override
public void update(StockData stockData) {
// 保存历史数据
historicalData.computeIfAbsent(stockData.getSymbol(), k -> new ArrayList<>())
.add(stockData);
// 分析数据并做出交易决策
analyzeAndTrade(stockData);
}
@Override
public String getObserverId() {
return botId;
}
@Override
public Set<String> getInterestedSymbols() {
return new HashSet<>(interestedSymbols);
}
private void analyzeAndTrade(StockData stockData) {
List<StockData> history = historicalData.get(stockData.getSymbol());
if (history.size() < 2) {
return; // 需要至少两个数据点进行分析
}
StockData previousData = history.get(history.size() - 2);
double priceChange = stockData.getPrice() - previousData.getPrice();
double changePercent = (priceChange / previousData.getPrice()) * 100;
System.out.println("📈 " + botId + " 分析 " + stockData.getSymbol() +
": 变化 " + String.format("%.2f", changePercent) + "%");
// 简单的交易策略
if (changePercent <= buyThreshold) {
System.out.println("💰 " + botId + " 决定买入 " + stockData.getSymbol() +
" (价格下跌 " + String.format("%.2f", Math.abs(changePercent)) + "%)");
executeBuyOrder(stockData);
} else if (changePercent >= sellThreshold) {
System.out.println("💸 " + botId + " 决定卖出 " + stockData.getSymbol() +
" (价格上涨 " + String.format("%.2f", changePercent) + "%)");
executeSellOrder(stockData);
}
}
private void executeBuyOrder(StockData stockData) {
// 模拟执行买入订单
try {
Thread.sleep(50);
System.out.println("✅ " + botId + " 成功买入 " + stockData.getSymbol());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void executeSellOrder(StockData stockData) {
// 模拟执行卖出订单
try {
Thread.sleep(50);
System.out.println("✅ " + botId + " 成功卖出 " + stockData.getSymbol());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Map<String, List<StockData>> getHistoricalData() {
return new HashMap<>(historicalData);
}
}
// 具体观察者 - 价格警报器
public class PriceAlert implements StockObserver {
private final String alertId;
private final Set<String> monitoredSymbols;
private final Map<String, Double> priceThresholds;
private final List<String> triggeredAlerts;
public PriceAlert(String alertId, Map<String, Double> priceThresholds) {
this.alertId = alertId;
this.monitoredSymbols = new HashSet<>(priceThresholds.keySet());
this.priceThresholds = new HashMap<>(priceThresholds);
this.triggeredAlerts = new ArrayList<>();
System.out.println("🚨 创建价格警报: " + alertId);
}
@Override
public void update(StockData stockData) {
Double threshold = priceThresholds.get(stockData.getSymbol());
if (threshold != null) {
checkPriceAlert(stockData, threshold);
}
}
@Override
public String getObserverId() {
return alertId;
}
@Override
public Set<String> getInterestedSymbols() {
return new HashSet<>(monitoredSymbols);
}
private void checkPriceAlert(StockData stockData, double threshold) {
if (stockData.getPrice() >= threshold) {
String alertMessage = String.format("🚨 %s 价格警报: %s 达到 $%.2f (阈值: $%.2f)",
alertId, stockData.getSymbol(),
stockData.getPrice(), threshold);
triggeredAlerts.add(alertMessage);
System.out.println(alertMessage);
sendAlertNotification(alertMessage);
}
}
private void sendAlertNotification(String message) {
// 模拟发送警报通知
try {
Thread.sleep(30);
System.out.println("📢 警报通知已发送: " + message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public List<String> getTriggeredAlerts() {
return new ArrayList<>(triggeredAlerts);
}
}
// 具体观察者 - 数据记录器
public class DataLogger implements StockObserver {
private final String loggerId;
private final Set<String> loggedSymbols;
private final Map<String, List<StockData>> allData;
public DataLogger(String loggerId, Set<String> symbols) {
this.loggerId = loggerId;
this.loggedSymbols = new HashSet<>(symbols);
this.allData = new HashMap<>();
System.out.println("📝 创建数据记录器: " + loggerId);
}
@Override
public void update(StockData stockData) {
allData.computeIfAbsent(stockData.getSymbol(), k -> new ArrayList<>())
.add(stockData);
System.out.println("💾 " + loggerId + " 记录 " + stockData.getSymbol() +
" 数据: " + stockData.getPrice());
}
@Override
public String getObserverId() {
return loggerId;
}
@Override
public Set<String> getInterestedSymbols() {
return new HashSet<>(loggedSymbols);
}
public void generateReport() {
System.out.println("\n=== 数据记录器 " + loggerId + " 报告 ===");
for (Map.Entry<String, List<StockData>> entry : allData.entrySet()) {
String symbol = entry.getKey();
List<StockData> dataList = entry.getValue();
if (!dataList.isEmpty()) {
double firstPrice = dataList.get(0).getPrice();
double lastPrice = dataList.get(dataList.size() - 1).getPrice();
double change = lastPrice - firstPrice;
double changePercent = (change / firstPrice) * 100;
System.out.printf("📊 %s: 起始$%.2f, 结束$%.2f, 变化%.2f (%.2f%%)%n",
symbol, firstPrice, lastPrice, change, changePercent);
}
}
}
public Map<String, List<StockData>> getAllData() {
return new HashMap<>(allData);
}
}
// 股票交易系统演示
public class StockTradingDemo {
public static void main(String[] args) {
System.out.println("=== 观察者模式演示:股票交易系统 ===\n");
// 1. 创建股票市场
RealTimeStockMarket nasdaq = new RealTimeStockMarket("纳斯达克");
// 2. 创建交易机器人
Set<String> bot1Symbols = Set.of("AAPL", "GOOGL", "TSLA");
TradingBot aggressiveBot = new TradingBot("激进交易机器人", bot1Symbols, -2.0, 3.0);
Set<String> bot2Symbols = Set.of("MSFT", "AMZN", "META");
TradingBot conservativeBot = new TradingBot("保守交易机器人", bot2Symbols, -1.0, 2.0);
// 3. 创建价格警报
Map<String, Double> alertThresholds = Map.of(
"AAPL", 150.0,
"TSLA", 200.0,
"AMZN", 130.0
);
PriceAlert highPriceAlert = new PriceAlert("高价警报", alertThresholds);
// 4. 创建数据记录器
Set<String> allSymbols = Set.of("AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META");
DataLogger marketLogger = new DataLogger("市场数据记录器", allSymbols);
// 5. 注册观察者
nasdaq.registerObserver(aggressiveBot);
nasdaq.registerObserver(conservativeBot);
nasdaq.registerObserver(highPriceAlert);
nasdaq.registerObserver(marketLogger);
// 6. 模拟市场数据
System.out.println("\n6. 模拟市场数据流:");
nasdaq.simulateMarketData();
// 7. 生成报告
System.out.println("\n7. 系统报告:");
marketLogger.generateReport();
System.out.println("\n触发警报:");
highPriceAlert.getTriggeredAlerts().forEach(System.out::println);
// 8. 性能测试
System.out.println("\n8. 大规模观察者测试:");
largeScaleObserverTest();
}
private static void largeScaleObserverTest() {
RealTimeStockMarket testMarket = new RealTimeStockMarket("测试市场");
// 创建大量观察者
List<StockObserver> testObservers = new ArrayList<>();
for (int i = 0; i < 50; i++) {
Set<String> symbols = Set.of("TEST" + (i % 10));
TradingBot bot = new TradingBot("测试机器人" + i, symbols, -1.0, 1.0);
testObservers.add(bot);
testMarket.registerObserver(bot);
}
// 性能测试
long startTime = System.currentTimeMillis();
// 发送大量数据更新
Random random = new Random();
for (int i = 0; i < 100; i++) {
String symbol = "TEST" + (i % 10);
StockData data = new StockData(symbol,
100 + random.nextDouble() * 100,
(random.nextDouble() - 0.5) * 10,
0, 1000000);
testMarket.addStockData(data);
}
long endTime = System.currentTimeMillis();
System.out.println("处理 100 个数据更新,50 个观察者,耗时: " + (endTime - startTime) + "ms");
}
}
🔄 架构演进:从简单到复杂
第一代:基础观察者模式
第二代:支持事件对象的观察者模式
// 事件对象基类
public abstract class Event {
private final String eventType;
private final long timestamp;
private final Object source;
public Event(String eventType, Object source) {
this.eventType = eventType;
this.source = source;
this.timestamp = System.currentTimeMillis();
}
// Getters
public String getEventType() { return eventType; }
public long getTimestamp() { return timestamp; }
public Object getSource() { return source; }
@Override
public String toString() {
return String.format("Event[type=%s, time=%d, source=%s]",
eventType, timestamp, source.getClass().getSimpleName());
}
}
// 具体事件类
public class StockPriceEvent extends Event {
private final StockData stockData;
private final double previousPrice;
public StockPriceEvent(Object source, StockData stockData, double previousPrice) {
super("STOCK_PRICE_UPDATE", source);
this.stockData = stockData;
this.previousPrice = previousPrice;
}
public StockData getStockData() { return stockData; }
public double getPreviousPrice() { return previousPrice; }
public double getPriceChange() { return stockData.getPrice() - previousPrice; }
public double getPriceChangePercent() {
return (getPriceChange() / previousPrice) * 100;
}
}
public class SystemAlertEvent extends Event {
private final String alertLevel;
private final String message;
private final Map<String, Object> additionalData;
public SystemAlertEvent(Object source, String alertLevel, String message) {
super("SYSTEM_ALERT", source);
this.alertLevel = alertLevel;
this.message = message;
this.additionalData = new HashMap<>();
}
public SystemAlertEvent(Object source, String alertLevel, String message,
Map<String, Object> additionalData) {
this(source, alertLevel, message);
this.additionalData.putAll(additionalData);
}
// Getters
public String getAlertLevel() { return alertLevel; }
public String getMessage() { return message; }
public Map<String, Object> getAdditionalData() { return new HashMap<>(additionalData); }
public void addData(String key, Object value) {
additionalData.put(key, value);
}
}
// 泛型事件观察者接口
public interface EventObserver<T extends Event> {
void onEvent(T event);
String getObserverId();
Class<T> getEventType();
default boolean supportsEvent(Event event) {
return getEventType().isInstance(event);
}
}
// 事件总线 - 高级观察者模式实现
public class EventBus {
private final Map<Class<? extends Event>, List<EventObserver<?>>> observers;
private final String busName;
private final boolean asyncProcessing;
private final ExecutorService executor;
public EventBus(String busName) {
this(busName, false);
}
public EventBus(String busName, boolean asyncProcessing) {
this.observers = new ConcurrentHashMap<>();
this.busName = busName;
this.asyncProcessing = asyncProcessing;
this.executor = asyncProcessing ?
Executors.newCachedThreadPool() :
Executors.newSingleThreadExecutor();
System.out.println("🚌 创建事件总线: " + busName +
(asyncProcessing ? " (异步模式)" : " (同步模式)"));
}
// 注册观察者
public <T extends Event> void registerObserver(EventObserver<T> observer) {
Class<T> eventType = observer.getEventType();
observers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(observer);
System.out.println("✅ " + observer.getObserverId() + " 注册到 " + busName +
" 监听 " + eventType.getSimpleName());
}
// 取消注册
public <T extends Event> void unregisterObserver(EventObserver<T> observer) {
Class<T> eventType = observer.getEventType();
List<EventObserver<?>> eventObservers = observers.get(eventType);
if (eventObservers != null && eventObservers.remove(observer)) {
System.out.println("❌ " + observer.getObserverId() + " 从 " + busName + " 取消注册");
}
}
// 发布事件
public void publishEvent(Event event) {
System.out.println("📤 " + busName + " 发布事件: " + event);
List<EventObserver<?>> allObservers = observers.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
List<EventObserver<?>> supportedObservers = allObservers.stream()
.filter(observer -> observer.supportsEvent(event))
.collect(Collectors.toList());
if (supportedObservers.isEmpty()) {
System.out.println("ℹ️ 没有观察者处理此事件");
return;
}
System.out.println("🔔 通知 " + supportedObservers.size() + " 个观察者");
for (EventObserver<?> observer : supportedObservers) {
if (asyncProcessing) {
executor.submit(() -> processEvent(observer, event));
} else {
processEvent(observer, event);
}
}
}
@SuppressWarnings("unchecked")
private <T extends Event> void processEvent(EventObserver<?> observer, Event event) {
try {
EventObserver<T> typedObserver = (EventObserver<T>) observer;
typedObserver.onEvent((T) event);
} catch (Exception e) {
System.err.println("❌ 观察者 " + observer.getObserverId() + " 处理事件失败: " + e.getMessage());
}
}
// 获取统计信息
public void printBusStats() {
System.out.println("\n=== 事件总线 " + busName + " 统计 ===");
System.out.println("支持的事件类型: " + observers.size());
observers.forEach((eventType, obsList) -> {
System.out.println(" " + eventType.getSimpleName() + ": " + obsList.size() + " 个观察者");
});
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("🛑 事件总线 " + busName + " 已关闭");
}
}
// 具体事件观察者实现
public class StockPriceAnalyzer implements EventObserver<StockPriceEvent> {
private final String analyzerId;
private final List<StockPriceEvent> processedEvents;
public StockPriceAnalyzer(String analyzerId) {
this.analyzerId = analyzerId;
this.processedEvents = new ArrayList<>();
System.out.println("📊 创建股价分析器: " + analyzerId);
}
@Override
public void onEvent(StockPriceEvent event) {
processedEvents.add(event);
StockData stockData = event.getStockData();
System.out.println("📈 " + analyzerId + " 分析 " + stockData.getSymbol() +
": 价格 " + stockData.getPrice() +
", 变化 " + String.format("%.2f", event.getPriceChangePercent()) + "%");
// 进行复杂的分析
performAdvancedAnalysis(event);
}
@Override
public String getObserverId() {
return analyzerId;
}
@Override
public Class<StockPriceEvent> getEventType() {
return StockPriceEvent.class;
}
private void performAdvancedAnalysis(StockPriceEvent event) {
// 模拟复杂分析
try {
Thread.sleep(20);
if (Math.abs(event.getPriceChangePercent()) > 5.0) {
System.out.println("⚠️ " + analyzerId + " 检测到大幅波动: " +
event.getStockData().getSymbol());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public List<StockPriceEvent> getProcessedEvents() {
return new ArrayList<>(processedEvents);
}
}
public class SystemMonitor implements EventObserver<SystemAlertEvent> {
private final String monitorId;
private final List<SystemAlertEvent> alerts;
public SystemMonitor(String monitorId) {
this.monitorId = monitorId;
this.alerts = new ArrayList<>();
System.out.println("🖥️ 创建系统监控器: " + monitorId);
}
@Override
public void onEvent(SystemAlertEvent event) {
alerts.add(event);
System.out.println("🚨 " + monitorId + " 收到系统警报: " +
event.getAlertLevel() + " - " + event.getMessage());
// 根据警报级别采取不同措施
handleAlert(event);
}
@Override
public String getObserverId() {
return monitorId;
}
@Override
public Class<SystemAlertEvent> getEventType() {
return SystemAlertEvent.class;
}
private void handleAlert(SystemAlertEvent event) {
switch (event.getAlertLevel()) {
case "CRITICAL":
System.out.println("🔴 紧急处理关键警报!");
// 发送紧急通知等
break;
case "WARNING":
System.out.println("🟡 处理警告级别警报");
// 记录日志等
break;
case "INFO":
System.out.println("🔵 记录信息级别警报");
break;
}
}
public List<SystemAlertEvent> getAlerts() {
return new ArrayList<>(alerts);
}
}
// 事件总线演示
public class EventBusDemo {
public static void main(String[] args) {
System.out.println("=== 事件总线观察者模式演示 ===\n");
// 1. 创建事件总线(异步模式)
EventBus eventBus = new EventBus("主事件总线", true);
// 2. 创建事件观察者
StockPriceAnalyzer priceAnalyzer1 = new StockPriceAnalyzer("技术分析器");
StockPriceAnalyzer priceAnalyzer2 = new StockPriceAnalyzer("基本面分析器");
SystemMonitor systemMonitor = new SystemMonitor("系统监控中心");
// 3. 注册观察者
eventBus.registerObserver(priceAnalyzer1);
eventBus.registerObserver(priceAnalyzer2);
eventBus.registerObserver(systemMonitor);
// 4. 发布事件
System.out.println("\n4. 发布股价事件:");
RealTimeStockMarket market = new RealTimeStockMarket("测试市场");
StockData appleStock = new StockData("AAPL", 150.0, 2.5, 1.67, 10000000);
StockPriceEvent priceEvent = new StockPriceEvent(market, appleStock, 147.5);
eventBus.publishEvent(priceEvent);
StockData teslaStock = new StockData("TSLA", 210.0, -5.0, -2.33, 8000000);
StockPriceEvent priceEvent2 = new StockPriceEvent(market, teslaStock, 215.0);
eventBus.publishEvent(priceEvent2);
// 5. 发布系统警报事件
System.out.println("\n5. 发布系统警报事件:");
SystemAlertEvent criticalAlert = new SystemAlertEvent(
eventBus, "CRITICAL", "数据库连接失败",
Map.of("retryCount", 3, "timeout", 5000)
);
eventBus.publishEvent(criticalAlert);
SystemAlertEvent infoAlert = new SystemAlertEvent(
eventBus, "INFO", "系统启动完成"
);
eventBus.publishEvent(infoAlert);
// 6. 总线统计
System.out.println("\n6. 事件总线统计:");
eventBus.printBusStats();
// 7. 等待异步处理完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 8. 清理资源
eventBus.shutdown();
// 9. 性能测试
System.out.println("\n9. 事件总线性能测试:");
eventBusPerformanceTest();
}
private static void eventBusPerformanceTest() {
EventBus performanceBus = new EventBus("性能测试总线", true);
// 创建大量观察者
List<EventObserver<StockPriceEvent>> testObservers = new ArrayList<>();
for (int i = 0; i < 100; i++) {
StockPriceAnalyzer analyzer = new StockPriceAnalyzer("性能测试分析器" + i);
testObservers.add(analyzer);
performanceBus.registerObserver(analyzer);
}
// 性能测试
long startTime = System.currentTimeMillis();
RealTimeStockMarket market = new RealTimeStockMarket("性能测试市场");
Random random = new Random();
for (int i = 0; i < 1000; i++) {
String symbol = "STOCK" + (i % 10);
StockData stockData = new StockData(symbol,
100 + random.nextDouble() * 100,
(random.nextDouble() - 0.5) * 10,
0, 1000000);
StockPriceEvent event = new StockPriceEvent(market, stockData, 100.0);
performanceBus.publishEvent(event);
}
long endTime = System.currentTimeMillis();
System.out.println("发布 1000 个事件,100 个观察者,耗时: " + (endTime - startTime) + "ms");
performanceBus.shutdown();
}
}
第三代:响应式观察者模式
// 响应式流接口
public interface ReactiveStream<T> {
void subscribe(ReactiveObserver<T> observer);
void unsubscribe(ReactiveObserver<T> observer);
void onNext(T item);
void onError(Throwable error);
void onComplete();
}
// 响应式观察者接口
public interface ReactiveObserver<T> {
void onNext(T item);
void onError(Throwable error);
void onComplete();
}
// 响应式主题实现
public class ReactiveSubject<T> implements ReactiveStream<T> {
private final List<ReactiveObserver<T>> observers;
private final String subjectName;
private boolean completed;
public ReactiveSubject(String subjectName) {
this.observers = new CopyOnWriteArrayList<>();
this.subjectName = subjectName;
this.completed = false;
System.out.println("⚡ 创建响应式主题: " + subjectName);
}
@Override
public void subscribe(ReactiveObserver<T> observer) {
if (!observers.contains(observer)) {
observers.add(observer);
System.out.println("✅ " + observer.getClass().getSimpleName() + " 订阅 " + subjectName);
}
}
@Override
public void unsubscribe(ReactiveObserver<T> observer) {
if (observers.remove(observer)) {
System.out.println("❌ " + observer.getClass().getSimpleName() + " 取消订阅 " + subjectName);
}
}
@Override
public void onNext(T item) {
if (completed) {
throw new IllegalStateException("流已结束");
}
System.out.println("📥 " + subjectName + " 发布数据: " + item);
for (ReactiveObserver<T> observer : observers) {
try {
observer.onNext(item);
} catch (Exception e) {
System.err.println("❌ 观察者处理数据失败: " + e.getMessage());
}
}
}
@Override
public void onError(Throwable error) {
System.out.println("💥 " + subjectName + " 发布错误: " + error.getMessage());
for (ReactiveObserver<T> observer : observers) {
try {
observer.onError(error);
} catch (Exception e) {
System.err.println("❌ 观察者处理错误失败: " + e.getMessage());
}
}
}
@Override
public void onComplete() {
System.out.println("🏁 " + subjectName + " 完成数据流");
completed = true;
for (ReactiveObserver<T> observer : observers) {
try {
observer.onComplete();
} catch (Exception e) {
System.err.println("❌ 观察者处理完成通知失败: " + e.getMessage());
}
}
}
public int getSubscriberCount() {
return observers.size();
}
}
// 响应式操作符
public class ReactiveOperators {
// 过滤操作符
public static <T> ReactiveStream<T> filter(ReactiveStream<T> source, Predicate<T> predicate) {
ReactiveSubject<T> result = new ReactiveSubject<>("filter");
source.subscribe(new ReactiveObserver<T>() {
@Override
public void onNext(T item) {
if (predicate.test(item)) {
result.onNext(item);
}
}
@Override
public void onError(Throwable error) {
result.onError(error);
}
@Override
public void onComplete() {
result.onComplete();
}
});
return result;
}
// 映射操作符
public static <T, R> ReactiveStream<R> map(ReactiveStream<T> source, Function<T, R> mapper) {
ReactiveSubject<R> result = new ReactiveSubject<>("map");
source.subscribe(new ReactiveObserver<T>() {
@Override
public void onNext(T item) {
result.onNext(mapper.apply(item));
}
@Override
public void onError(Throwable error) {
result.onError(error);
}
@Override
public void onComplete() {
result.onComplete();
}
});
return result;
}
// 缓冲操作符
public static <T> ReactiveStream<List<T>> buffer(ReactiveStream<T> source, int bufferSize) {
ReactiveSubject<List<T>> result = new ReactiveSubject<>("buffer");
List<T> buffer = new ArrayList<>();
source.subscribe(new ReactiveObserver<T>() {
@Override
public void onNext(T item) {
buffer.add(item);
if (buffer.size() >= bufferSize) {
result.onNext(new ArrayList<>(buffer));
buffer.clear();
}
}
@Override
public void onError(Throwable error) {
if (!buffer.isEmpty()) {
result.onNext(new ArrayList<>(buffer));
}
result.onError(error);
}
@Override
public void onComplete() {
if (!buffer.isEmpty()) {
result.onNext(new ArrayList<>(buffer));
}
result.onComplete();
}
});
return result;
}
}
// 具体响应式观察者
public class StockPriceProcessor implements ReactiveObserver<StockData> {
private final String processorId;
private final List<StockData> processedData;
public StockPriceProcessor(String processorId) {
this.processorId = processorId;
this.processedData = new ArrayList<>();
System.out.println("🔧 创建股价处理器: " + processorId);
}
@Override
public void onNext(StockData stockData) {
processedData.add(stockData);
System.out.println("🔄 " + processorId + " 处理: " + stockData.getSymbol() +
" @ $" + stockData.getPrice());
// 模拟处理逻辑
processStockData(stockData);
}
@Override
public void onError(Throwable error) {
System.out.println("❌ " + processorId + " 遇到错误: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ " + processorId + " 处理完成,共处理 " +
processedData.size() + " 条数据");
generateSummary();
}
private void processStockData(StockData stockData) {
// 模拟复杂处理
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateSummary() {
if (processedData.isEmpty()) return;
Map<String, List<StockData>> bySymbol = processedData.stream()
.collect(Collectors.groupingBy(StockData::getSymbol));
System.out.println("\n📈 " + processorId + " 处理摘要:");
bySymbol.forEach((symbol, data) -> {
double avgPrice = data.stream()
.mapToDouble(StockData::getPrice)
.average()
.orElse(0.0);
System.out.printf(" %s: 平均价格 $%.2f (%d 条记录)%n",
symbol, avgPrice, data.size());
});
}
public List<StockData> getProcessedData() {
return new ArrayList<>(processedData);
}
}
// 响应式系统演示
public class ReactiveObserverDemo {
public static void main(String[] args) {
System.out.println("=== 响应式观察者模式演示 ===\n");
// 1. 创建响应式主题
ReactiveSubject<StockData> stockStream = new ReactiveSubject<>("股价数据流");
// 2. 创建处理器
StockPriceProcessor processor1 = new StockPriceProcessor("实时处理器");
StockPriceProcessor processor2 = new StockPriceProcessor("批处理处理器");
// 3. 应用响应式操作符
System.out.println("\n3. 应用响应式操作符:");
// 过滤高价股票
ReactiveStream<StockData> highPriceStream = ReactiveOperators.filter(
stockStream, data -> data.getPrice() > 150.0
);
// 映射到价格变化
ReactiveStream<Double> priceChangeStream = ReactiveOperators.map(
stockStream, StockData::getChangePercent
);
// 缓冲处理
ReactiveStream<List<StockData>> bufferedStream = ReactiveOperators.buffer(
stockStream, 5
);
// 4. 订阅处理流
stockStream.subscribe(processor1);
highPriceStream.subscribe(new StockPriceProcessor("高价股票处理器"));
bufferedStream.subscribe(new ReactiveObserver<List<StockData>>() {
@Override
public void onNext(List<StockData> batch) {
System.out.println("📦 批处理: " + batch.size() + " 条记录");
double avgPrice = batch.stream()
.mapToDouble(StockData::getPrice)
.average()
.orElse(0.0);
System.out.println(" 平均价格: $" + String.format("%.2f", avgPrice));
}
@Override
public void onError(Throwable error) {
System.out.println("❌ 批处理错误: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ 批处理完成");
}
});
// 5. 发布数据
System.out.println("\n5. 发布股价数据:");
Random random = new Random();
String[] symbols = {"AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"};
for (int i = 0; i < 20; i++) {
String symbol = symbols[i % symbols.length];
double price = 100 + random.nextDouble() * 100;
double change = (random.nextDouble() - 0.5) * 10;
double changePercent = (change / price) * 100;
long volume = 1000000 + random.nextInt(9000000);
StockData stockData = new StockData(symbol, price, change, changePercent, volume);
stockStream.onNext(stockData);
// 模拟数据间隔
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 6. 完成数据流
stockStream.onComplete();
// 7. 复杂流处理演示
System.out.println("\n7. 复杂流处理演示:");
complexStreamProcessing();
}
private static void complexStreamProcessing() {
ReactiveSubject<Integer> numberStream = new ReactiveSubject<>("数字流");
// 创建复杂处理管道
ReactiveStream<String> resultStream = ReactiveOperators.map(
ReactiveOperators.filter(
ReactiveOperators.buffer(numberStream, 3),
list -> list.stream().mapToInt(Integer::intValue).sum() > 10
),
list -> "批次: " + list + ", 总和: " + list.stream().mapToInt(Integer::intValue).sum()
);
resultStream.subscribe(new ReactiveObserver<String>() {
@Override
public void onNext(String item) {
System.out.println("📊 " + item);
}
@Override
public void onError(Throwable error) {
System.out.println("❌ 处理错误: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ 复杂流处理完成");
}
});
// 发布数字
for (int i = 1; i <= 10; i++) {
numberStream.onNext(i);
}
numberStream.onComplete();
}
}
⚡ 性能优化与缓存策略
高性能观察者模式
// 高性能观察者注册表
public class HighPerformanceObserverRegistry<T> {
private final Map<Class<?>, CopyOnWriteArrayList<EventObserver<?>>> observerMap;
private final ExecutorService executor;
private final MetricsCollector metrics;
public HighPerformanceObserverRegistry() {
this.observerMap = new ConcurrentHashMap<>();
this.executor = Executors.newWorkStealingPool();
this.metrics = new MetricsCollector();
System.out.println("⚡ 创建高性能观察者注册表");
}
public <E extends T> void register(Class<E> eventType, EventObserver<E> observer) {
observerMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(observer);
metrics.recordRegistration(eventType, observer.getObserverId());
}
public <E extends T> void unregister(Class<E> eventType, EventObserver<E> observer) {
List<EventObserver<?>> observers = observerMap.get(eventType);
if (observers != null) {
observers.remove(observer);
metrics.recordUnregistration(eventType, observer.getObserverId());
}
}
@SuppressWarnings("unchecked")
public <E extends T> void publish(E event) {
Class<?> eventType = event.getClass();
List<EventObserver<?>> observers = observerMap.get(eventType);
if (observers == null || observers.isEmpty()) {
return;
}
long startTime = System.nanoTime();
int observerCount = observers.size();
// 并行处理事件
List<CompletableFuture<Void>> futures = observers.stream()
.map(observer -> CompletableFuture.runAsync(() -> {
try {
EventObserver<E> typedObserver = (EventObserver<E>) observer;
typedObserver.onEvent(event);
metrics.recordSuccess(eventType, observer.getObserverId());
} catch (Exception e) {
metrics.recordFailure(eventType, observer.getObserverId(), e);
}
}, executor))
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
long duration = System.nanoTime() - startTime;
metrics.recordPublish(eventType, observerCount, duration);
});
}
public void printMetrics() {
metrics.printReport();
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 性能指标收集器
private static class MetricsCollector {
private final Map<Class<?>, EventMetrics> eventMetrics;
private final Map<String, ObserverMetrics> observerMetrics;
public MetricsCollector() {
this.eventMetrics = new ConcurrentHashMap<>();
this.observerMetrics = new ConcurrentHashMap<>();
}
public void recordRegistration(Class<?> eventType, String observerId) {
eventMetrics.computeIfAbsent(eventType, k -> new EventMetrics());
observerMetrics.computeIfAbsent(observerId, k -> new ObserverMetrics());
}
public void recordUnregistration(Class<?> eventType, String observerId) {
// 记录注销统计
}
public void recordPublish(Class<?> eventType, int observerCount, long duration) {
EventMetrics metrics = eventMetrics.get(eventType);
if (metrics != null) {
metrics.recordPublish(observerCount, duration);
}
}
public void recordSuccess(Class<?> eventType, String observerId) {
ObserverMetrics metrics = observerMetrics.get(observerId);
if (metrics != null) {
metrics.recordSuccess();
}
}
public void recordFailure(Class<?> eventType, String observerId, Exception error) {
ObserverMetrics metrics = observerMetrics.get(observerId);
if (metrics != null) {
metrics.recordFailure(error);
}
}
public void printReport() {
System.out.println("\n=== 高性能观察者系统指标 ===");
System.out.println("\n事件类型统计:");
eventMetrics.forEach((eventType, metrics) -> {
System.out.printf(" %s: 发布%d次, 平均观察者%.1f, 平均耗时%.2fms%n",
eventType.getSimpleName(),
metrics.publishCount,
metrics.getAverageObservers(),
metrics.getAverageDurationMs());
});
System.out.println("\n观察者统计:");
observerMetrics.forEach((observerId, metrics) -> {
System.out.printf(" %s: 成功%d次, 失败%d次, 成功率%.1f%%%n",
observerId,
metrics.successCount,
metrics.failureCount,
metrics.getSuccessRate());
});
}
}
private static class EventMetrics {
private int publishCount;
private long totalObservers;
private long totalDuration;
public void recordPublish(int observerCount, long duration) {
publishCount++;
totalObservers += observerCount;
totalDuration += duration;
}
public double getAverageObservers() {
return publishCount > 0 ? (double) totalObservers / publishCount : 0;
}
public double getAverageDurationMs() {
return publishCount > 0 ? (totalDuration / 1_000_000.0) / publishCount : 0;
}
}
private static class ObserverMetrics {
private int successCount;
private int failureCount;
public void recordSuccess() {
successCount++;
}
public void recordFailure(Exception error) {
failureCount++;
}
public double getSuccessRate() {
int total = successCount + failureCount;
return total > 0 ? (successCount * 100.0) / total : 100.0;
}
}
}
// 高性能系统演示
public class HighPerformanceObserverDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 高性能观察者模式演示 ===\n");
HighPerformanceObserverRegistry<Event> registry = new HighPerformanceObserverRegistry<>();
// 创建大量观察者
List<EventObserver<StockPriceEvent>> stockObservers = new ArrayList<>();
for (int i = 0; i < 50; i++) {
StockPriceAnalyzer analyzer = new StockPriceAnalyzer("高性能分析器" + i);
stockObservers.add(analyzer);
registry.register(StockPriceEvent.class, analyzer);
}
List<EventObserver<SystemAlertEvent>> alertObservers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
SystemMonitor monitor = new SystemMonitor("高性能监控器" + i);
alertObservers.add(monitor);
registry.register(SystemAlertEvent.class, monitor);
}
// 发布大量事件
System.out.println("发布事件...");
RealTimeStockMarket market = new RealTimeStockMarket("高性能市场");
Random random = new Random();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String symbol = "STOCK" + (i % 20);
StockData stockData = new StockData(symbol,
100 + random.nextDouble() * 100,
(random.nextDouble() - 0.5) * 10,
0, 1000000);
StockPriceEvent event = new StockPriceEvent(market, stockData, 100.0);
registry.publish(event);
// 偶尔发布系统警报
if (i % 100 == 0) {
SystemAlertEvent alert = new SystemAlertEvent(
registry, "INFO", "系统运行正常 - " + i
);
registry.publish(alert);
}
}
// 等待处理完成
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
System.out.println("\n总执行时间: " + (endTime - startTime) + "ms");
// 打印性能指标
registry.printMetrics();
// 清理资源
registry.shutdown();
}
}
🔧 测试策略与验证
观察者模式测试用例
class ObserverPatternTest {
@Test
void testNewsPublisherBasicOperations() {
// 给定
NewsPublisher publisher = new NewsPublisher("测试发布器");
EmailSubscriber subscriber = new EmailSubscriber("测试用户", "test@email.com");
// 当
publisher.registerObserver(subscriber);
publisher.setNews("测试新闻");
// 然后
assertEquals(1, publisher.getSubscriberCount());
assertEquals(1, subscriber.getReceivedCount());
assertTrue(subscriber.getReceivedNews().contains("测试新闻"));
}
@Test
void testMultipleObservers() {
// 给定
NewsPublisher publisher = new NewsPublisher("测试发布器");
EmailSubscriber emailSub = new EmailSubscriber("邮件用户", "email@test.com");
PushSubscriber pushSub = new PushSubscriber("推送用户", "device123");
// 当
publisher.registerObserver(emailSub);
publisher.registerObserver(pushSub);
publisher.setNews("多用户新闻");
// 然后
assertEquals(2, publisher.getSubscriberCount());
assertEquals(1, emailSub.getReceivedCount());
assertEquals(1, pushSub.getReceivedNotifications().size());
}
@Test
void testObserverRemoval() {
// 给定
NewsPublisher publisher = new NewsPublisher("测试发布器");
EmailSubscriber subscriber = new EmailSubscriber("测试用户", "test@email.com");
publisher.registerObserver(subscriber);
publisher.setNews("第一条新闻");
// 当
publisher.removeObserver(subscriber);
publisher.setNews("第二条新闻");
// 然后
assertEquals(0, publisher.getSubscriberCount());
assertEquals(1, subscriber.getReceivedCount()); // 应该只收到第一条
}
@Test
void testEventBusRegistration() {
// 给定
EventBus eventBus = new EventBus("测试总线");
StockPriceAnalyzer analyzer = new StockPriceAnalyzer("测试分析器");
// 当
eventBus.registerObserver(analyzer);
// 然后
// 验证注册成功(主要通过控制台输出验证)
assertDoesNotThrow(() -> eventBus.publishEvent(
new StockPriceEvent(this,
new StockData("TEST", 100.0, 1.0, 1.0, 1000),
99.0)
));
}
@Test
void testReactiveStreamOperations() {
// 给定
ReactiveSubject<Integer> numberStream = new ReactiveSubject<>("测试流");
List<Integer> receivedNumbers = new ArrayList<>();
numberStream.subscribe(new ReactiveObserver<Integer>() {
@Override
public void onNext(Integer item) {
receivedNumbers.add(item);
}
@Override
public void onError(Throwable error) {}
@Override
public void onComplete() {}
});
// 当
numberStream.onNext(1);
numberStream.onNext(2);
numberStream.onNext(3);
numberStream.onComplete();
// 然后
assertEquals(3, receivedNumbers.size());
assertArrayEquals(new Integer[]{1, 2, 3}, receivedNumbers.toArray());
}
@Test
void testHighPerformanceRegistry() throws InterruptedException {
// 给定
HighPerformanceObserverRegistry<Event> registry = new HighPerformanceObserverRegistry<>();
List<String> processedEvents = new ArrayList<>();
EventObserver<StockPriceEvent> observer = new EventObserver<StockPriceEvent>() {
@Override
public void onEvent(StockPriceEvent event) {
processedEvents.add(event.getStockData().getSymbol());
}
@Override
public String getObserverId() {
return "测试观察者";
}
@Override
public Class<StockPriceEvent> getEventType() {
return StockPriceEvent.class;
}
};
registry.register(StockPriceEvent.class, observer);
// 当
RealTimeStockMarket market = new RealTimeStockMarket("测试市场");
for (int i = 0; i < 10; i++) {
StockData stockData = new StockData("TEST" + i, 100.0, 1.0, 1.0, 1000);
StockPriceEvent event = new StockPriceEvent(market, stockData, 99.0);
registry.publish(event);
}
// 等待异步处理
Thread.sleep(100);
// 然后
assertEquals(10, processedEvents.size());
registry.shutdown();
}
@Test
void testStockTradingBotLogic() {
// 给定
TradingBot bot = new TradingBot("测试机器人", Set.of("AAPL"), -2.0, 3.0);
StockData stockData1 = new StockData("AAPL", 100.0, 0.0, 0.0, 1000);
StockData stockData2 = new StockData("AAPL", 98.0, -2.0, -2.0, 1000); // 下跌2%
// 当
bot.update(stockData1); // 第一个数据点
bot.update(stockData2); // 第二个数据点,触发买入条件
// 然后
// 验证交易逻辑(主要通过控制台输出验证)
Map<String, List<StockData>> history = bot.getHistoricalData();
assertEquals(2, history.get("AAPL").size());
}
}
// Mock观察者用于测试
class MockObserver implements Observer {
private String name;
private List<String> receivedMessages;
private boolean shouldThrowError;
public MockObserver(String name) {
this(name, false);
}
public MockObserver(String name, boolean shouldThrowError) {
this.name = name;
this.receivedMessages = new ArrayList<>();
this.shouldThrowError = shouldThrowError;
}
@Override
public void update(String message) {
if (shouldThrowError) {
throw new RuntimeException("模拟观察者错误");
}
receivedMessages.add(message);
}
@Override
public String getObserverName() {
return name;
}
public List<String> getReceivedMessages() {
return receivedMessages;
}
public int getMessageCount() {
return receivedMessages.size();
}
}
// 性能测试
class ObserverPerformanceTest {
@Test
void testLargeScaleObserverPerformance() {
// 给定
NewsPublisher publisher = new NewsPublisher("性能测试发布器");
List<MockObserver> observers = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
observers.add(new MockObserver("观察者" + i));
}
// 当
long startTime = System.nanoTime();
// 注册所有观察者
for (MockObserver observer : observers) {
publisher.registerObserver(observer);
}
// 发布消息
publisher.setNews("性能测试消息");
long endTime = System.nanoTime();
long duration = endTime - startTime;
// 然后
System.out.println("1000个观察者通知耗时: " + duration + "ns");
assertTrue(duration > 0);
// 验证所有观察者都收到了消息
for (MockObserver observer : observers) {
assertEquals(1, observer.getMessageCount());
}
}
}
🌐 现代架构中的观察者模式
Spring Boot中的观察者应用
// Spring事件类
public class SpringStockEvent extends ApplicationEvent {
private final StockData stockData;
public SpringStockEvent(Object source, StockData stockData) {
super(source);
this.stockData = stockData;
}
public StockData getStockData() {
return stockData;
}
}
// Spring事件监听器
@Component
public class SpringStockEventListener {
private static final Logger logger = LoggerFactory.getLogger(SpringStockEventListener.class);
@EventListener
@Async
public void handleStockPriceUpdate(SpringStockEvent event) {
StockData stockData = event.getStockData();
logger.info("📈 Spring处理股价更新: {} - ${}",
stockData.getSymbol(), stockData.getPrice());
// 模拟处理逻辑
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@EventListener
@Order(1)
public void handleStockPriceUpdateSync(SpringStockEvent event) {
logger.info("🔄 同步处理股价更新: {}", event.getStockData().getSymbol());
}
}
// Spring配置
@Configuration
@EnableAsync
@EnableScheduling
public class SpringObserverConfiguration {
@Bean
public RealTimeStockMarket stockMarket() {
return new RealTimeStockMarket("Spring股票市场");
}
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(taskExecutor());
return eventMulticaster;
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("SpringEvent-");
executor.initialize();
return executor;
}
}
// Spring服务
@Service
public class SpringStockService {
private final ApplicationEventPublisher eventPublisher;
private final RealTimeStockMarket stockMarket;
public SpringStockService(ApplicationEventPublisher eventPublisher,
RealTimeStockMarket stockMarket) {
this.eventPublisher = eventPublisher;
this.stockMarket = stockMarket;
setupMarketObservers();
}
private void setupMarketObservers() {
// 将Spring事件发布器注册为股票市场的观察者
stockMarket.registerObserver(new StockObserver() {
@Override
public void update(StockData stockData) {
SpringStockEvent event = new SpringStockEvent(this, stockData);
eventPublisher.publishEvent(event);
}
@Override
public String getObserverId() {
return "Spring事件发布器";
}
@Override
public Set<String> getInterestedSymbols() {
return Set.of("AAPL", "GOOGL", "MSFT"); // 只关注这些股票
}
});
}
@Scheduled(fixedRate = 5000)
public void simulateMarketData() {
stockMarket.simulateMarketData();
}
public void addStockData(StockData stockData) {
stockMarket.addStockData(stockData);
}
}
// REST控制器
@RestController
@RequestMapping("/api/stocks")
public class SpringStockController {
private final SpringStockService stockService;
public SpringStockController(SpringStockService stockService) {
this.stockService = stockService;
}
@PostMapping("/publish")
public ResponseEntity<String> publishStockData(@RequestBody StockDataRequest request) {
try {
StockData stockData = new StockData(
request.getSymbol(),
request.getPrice(),
request.getChange(),
request.getChangePercent(),
request.getVolume()
);
stockService.addStockData(stockData);
return ResponseEntity.ok("股票数据发布成功");
} catch (Exception e) {
return ResponseEntity.badRequest().body("发布失败: " + e.getMessage());
}
}
@GetMapping("/symbols")
public ResponseEntity<List<String>> getSupportedSymbols() {
// 返回支持的股票符号
return ResponseEntity.ok(List.of("AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"));
}
}
// 请求DTO
public class StockDataRequest {
private String symbol;
private double price;
private double change;
private double changePercent;
private long volume;
// Getters and Setters
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public double getChange() { return change; }
public void setChange(double change) { this.change = change; }
public double getChangePercent() { return changePercent; }
public void setChangePercent(double changePercent) { this.changePercent = changePercent; }
public long getVolume() { return volume; }
public void setVolume(long volume) { this.volume = volume; }
}
// 微服务事件发布
@Service
public class MicroserviceEventService {
private final ApplicationEventPublisher eventPublisher;
private final KafkaTemplate<String, Object> kafkaTemplate;
public MicroserviceEventService(ApplicationEventPublisher eventPublisher,
KafkaTemplate<String, Object> kafkaTemplate) {
this.eventPublisher = eventPublisher;
this.kafkaTemplate = kafkaTemplate;
}
public void publishLocalEvent(StockData stockData) {
SpringStockEvent event = new SpringStockEvent(this, stockData);
eventPublisher.publishEvent(event);
}
@Async
public void publishDistributedEvent(StockData stockData) {
// 发布到Kafka供其他微服务消费
kafkaTemplate.send("stock-price-updates", stockData.getSymbol(), stockData)
.addCallback(
result -> logger.info("✅ 分布式事件发布成功: {}", stockData.getSymbol()),
failure -> logger.error("❌ 分布式事件发布失败: {}", failure.getMessage())
);
}
}
// Kafka消费者(其他微服务)
@Component
public class StockPriceKafkaConsumer {
@KafkaListener(topics = "stock-price-updates", groupId = "trading-service")
public void consumeStockPriceUpdate(StockData stockData) {
logger.info("📥 交易服务收到股价更新: {}", stockData);
// 处理股价更新,触发交易逻辑等
processStockUpdateForTrading(stockData);
}
private void processStockUpdateForTrading(StockData stockData) {
// 交易逻辑处理
logger.info("🔧 处理交易逻辑 for {}", stockData.getSymbol());
}
}
📊 模式对比与选择指南
观察者模式 vs 其他行为型模式
| 维度 | 观察者模式 | 发布-订阅模式 | 中介者模式 |
|---|---|---|---|
| 目的 | 一对多依赖通知 | 解耦发布者和订阅者 | 集中控制对象交互 |
| 耦合度 | 松耦合 | 完全解耦 | 通过中介者耦合 |
| 通信方式 | 直接通知 | 通过消息代理 | 通过中介者 |
| 使用时机 | 对象状态变化需要通知其他对象 | 需要完全解耦的组件间通信 | 复杂对象交互需要集中管理 |
观察者模式变体对比
| 类型 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 经典观察者 | 简单直观,易于理解 | 紧耦合,观察者知道主题 | 简单的事件通知 |
| 事件总线 | 完全解耦,灵活性强 | 复杂度高,需要事件类 | 复杂系统,多组件通信 |
| 响应式流 | 支持流操作,组合性强 | 学习曲线陡峭 | 数据流处理,实时系统 |
| Spring事件 | 与Spring生态集成好 | 依赖Spring框架 | Spring应用内部事件 |
何时使用观察者模式?
- 状态变化通知:一个对象状态改变需要通知其他对象,且不知道具体有多少对象需要通知
- 事件驱动架构:需要构建事件驱动的系统架构
- 解耦需求:需要减少对象之间的耦合度
- 广播通信:需要一对多的通信机制
- 实时数据处理:需要处理实时数据流和事件流
架构决策流程图
🎯 最佳实践与陷阱规避
✅ 最佳实践
- 明确生命周期:清晰定义观察者的注册和注销时机
- 避免内存泄漏:及时移除不再需要的观察者
- 考虑线程安全:在多线程环境中确保观察者列表的线程安全
- 使用合适的数据结构:根据观察者数量选择合适的集合类型
- 提供错误处理:观察者处理事件时的错误不应该影响其他观察者
❌ 常见陷阱
- 内存泄漏:观察者忘记注销导致无法被垃圾回收
- 性能问题:观察者数量过多导致通知性能下降
- 循环依赖:观察者和被观察者之间形成循环依赖
- 事件顺序问题:观察者处理事件的顺序依赖导致问题
- 并发修改:在通知过程中修改观察者列表
🔧 重构技巧
// 重构前:紧耦合的直接调用
public class StockMarket {
private TradingService tradingService;
private NotificationService notificationService;
private ReportService reportService;
public void updateStockPrice(String symbol, double price) {
// 更新价格
this.latestPrice = price;
// 直接调用依赖服务
tradingService.checkTradingOpportunity(symbol, price);
notificationService.sendPriceAlert(symbol, price);
reportService.recordPriceUpdate(symbol, price);
}
}
// 重构后:使用观察者模式
public class RefactoredStockMarket {
private List<StockObserver> observers = new ArrayList<>();
private double latestPrice;
public void updateStockPrice(String symbol, double price) {
this.latestPrice = price;
notifyObservers(symbol, price);
}
public void registerObserver(StockObserver observer) {
observers.add(observer);
}
public void removeObserver(StockObserver observer) {
observers.remove(observer);
}
private void notifyObservers(String symbol, double price) {
StockData data = new StockData(symbol, price, 0, 0, 0);
for (StockObserver observer : observers) {
observer.update(data);
}
}
}
// 观察者接口和实现
public interface StockObserver {
void update(StockData stockData);
}
@Component
public class TradingService implements StockObserver {
@Override
public void update(StockData stockData) {
checkTradingOpportunity(stockData.getSymbol(), stockData.getPrice());
}
private void checkTradingOpportunity(String symbol, double price) {
// 交易逻辑
}
}
@Component
public class NotificationService implements StockObserver {
@Override
public void update(StockData stockData) {
sendPriceAlert(stockData.getSymbol(), stockData.getPrice());
}
private void sendPriceAlert(String symbol, double price) {
// 通知逻辑
}
}
💡 总结升华
观察者模式不仅仅是一种设计模式,更是一种事件驱动架构的哲学。它教会我们:
- 解耦思维:通过抽象接口解耦观察者和被观察者
- 事件思维:将系统行为建模为一系列事件和事件处理器
- 响应式思维:构建对变化做出响应的系统
- 发布-订阅思维:建立高效的消息分发机制
在现代软件开发中,观察者模式已经成为构建响应式、事件驱动系统的重要工具。从GUI事件处理到微服务通信,从实时数据处理到系统监控,观察者模式都在默默地提供着强大的事件处理能力。
掌握观察者模式,意味着你不仅学会了一种设计模式,更获得了一种构建响应式系统的思维方式。这种思维方式将帮助你在设计复杂系统时,能够识别出事件源和事件处理器,构建出松耦合、高内聚的系统架构。
记住:优秀的架构师能够看到系统中的事件流,观察者模式正是处理这些事件流的利器。选择观察者模式,就是选择构建响应式和可扩展的系统架构。
更多推荐



所有评论(0)