🎯 模式定义与核心思想

观察者模式(Observer Pattern)是一种行为型设计模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新

核心哲学

“解耦观察者与被观察者,实现松耦合的事件通知机制”

观察者集合
观察者1
观察者2
观察者3
被观察者Subject
通知观察者
添加观察者
移除观察者
状态变化

🏗️ 基础架构与角色分析

模式结构组成

角色 职责 示例
主题/被观察者 维护观察者列表,提供注册/注销方法 Subject
具体主题 实现主题接口,维护具体状态 ConcreteSubject
观察者 定义更新接口 Observer
具体观察者 实现更新接口,保持与主题状态一致 ConcreteObserver

💻 完整代码示例:新闻发布系统

基础观察者模式实现

// 观察者接口
public interface Observer {
    void update(String message);
    String getObserverName();
}

// 主题/被观察者接口
public interface Subject {
    void registerObserver(Observer observer);
    void removeObserver(Observer observer);
    void notifyObservers();
    void setNews(String news);
    String getNews();
}

// 具体主题 - 新闻发布器
public class NewsPublisher implements Subject {
    private List<Observer> observers;
    private String latestNews;
    private String publisherName;
    
    public NewsPublisher(String publisherName) {
        this.observers = new ArrayList<>();
        this.latestNews = "暂无新闻";
        this.publisherName = publisherName;
        System.out.println("📰 创建新闻发布器: " + publisherName);
    }
    
    @Override
    public void registerObserver(Observer observer) {
        if (!observers.contains(observer)) {
            observers.add(observer);
            System.out.println("✅ " + observer.getObserverName() + " 订阅了 " + publisherName);
        }
    }
    
    @Override
    public void removeObserver(Observer observer) {
        if (observers.remove(observer)) {
            System.out.println("❌ " + observer.getObserverName() + " 取消订阅 " + publisherName);
        }
    }
    
    @Override
    public void notifyObservers() {
        if (observers.isEmpty()) {
            System.out.println("ℹ️ 暂无订阅者,无需通知");
            return;
        }
        
        System.out.println("🔔 " + publisherName + " 开始通知 " + observers.size() + " 个订阅者");
        for (Observer observer : observers) {
            observer.update(latestNews);
        }
    }
    
    @Override
    public void setNews(String news) {
        this.latestNews = news;
        System.out.println("📢 " + publisherName + " 发布新闻: " + news);
        notifyObservers();
    }
    
    @Override
    public String getNews() {
        return latestNews;
    }
    
    // 获取订阅者信息
    public List<String> getSubscriberNames() {
        return observers.stream()
                .map(Observer::getObserverName)
                .collect(Collectors.toList());
    }
    
    public int getSubscriberCount() {
        return observers.size();
    }
    
    public String getPublisherName() {
        return publisherName;
    }
}

// 具体观察者 - 电子邮件订阅者
public class EmailSubscriber implements Observer {
    private String email;
    private String name;
    private List<String> receivedNews;
    
    public EmailSubscriber(String name, String email) {
        this.name = name;
        this.email = email;
        this.receivedNews = new ArrayList<>();
        System.out.println("📧 创建邮件订阅者: " + name + " (" + email + ")");
    }
    
    @Override
    public void update(String message) {
        receivedNews.add(message);
        System.out.println("📨 " + name + " 收到邮件通知: " + message);
        // 模拟发送邮件
        simulateEmailSending(message);
    }
    
    @Override
    public String getObserverName() {
        return name + " (邮件)";
    }
    
    private void simulateEmailSending(String message) {
        try {
            Thread.sleep(100); // 模拟邮件发送延迟
            System.out.println("✅ 邮件已发送到: " + email);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("❌ 邮件发送中断");
        }
    }
    
    public List<String> getReceivedNews() {
        return new ArrayList<>(receivedNews);
    }
    
    public int getReceivedCount() {
        return receivedNews.size();
    }
}

// 具体观察者 - 手机推送订阅者
public class PushSubscriber implements Observer {
    private String deviceId;
    private String username;
    private List<String> receivedNotifications;
    
    public PushSubscriber(String username, String deviceId) {
        this.username = username;
        this.deviceId = deviceId;
        this.receivedNotifications = new ArrayList<>();
        System.out.println("📱 创建推送订阅者: " + username + " (" + deviceId + ")");
    }
    
    @Override
    public void update(String message) {
        receivedNotifications.add(message);
        System.out.println("🔔 " + username + " 收到推送通知: " + message);
        // 模拟推送发送
        simulatePushSending(message);
    }
    
    @Override
    public String getObserverName() {
        return username + " (推送)";
    }
    
    private void simulatePushSending(String message) {
        try {
            Thread.sleep(50); // 模拟推送发送延迟
            System.out.println("✅ 推送已发送到设备: " + deviceId);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("❌ 推送发送中断");
        }
    }
    
    public List<String> getReceivedNotifications() {
        return new ArrayList<>(receivedNotifications);
    }
}

// 具体观察者 - 网站订阅者
public class WebSubscriber implements Observer {
    private String websiteUrl;
    private String siteName;
    private List<String> displayedNews;
    
    public WebSubscriber(String siteName, String websiteUrl) {
        this.siteName = siteName;
        this.websiteUrl = websiteUrl;
        this.displayedNews = new ArrayList<>();
        System.out.println("🌐 创建网站订阅者: " + siteName + " (" + websiteUrl + ")");
    }
    
    @Override
    public void update(String message) {
        displayedNews.add(message);
        System.out.println("🖥️ " + siteName + " 网站更新新闻: " + message);
        // 模拟网站更新
        simulateWebsiteUpdate(message);
    }
    
    @Override
    public String getObserverName() {
        return siteName + " (网站)";
    }
    
    private void simulateWebsiteUpdate(String message) {
        try {
            Thread.sleep(80); // 模拟网站更新延迟
            System.out.println("✅ 网站 " + siteName + " 已更新新闻内容");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("❌ 网站更新中断");
        }
    }
    
    public List<String> getDisplayedNews() {
        return new ArrayList<>(displayedNews);
    }
}

// 具体观察者 - 短信订阅者
public class SmsSubscriber implements Observer {
    private String phoneNumber;
    private String customerName;
    private List<String> receivedSms;
    
    public SmsSubscriber(String customerName, String phoneNumber) {
        this.customerName = customerName;
        this.phoneNumber = phoneNumber;
        this.receivedSms = new ArrayList<>();
        System.out.println("📞 创建短信订阅者: " + customerName + " (" + phoneNumber + ")");
    }
    
    @Override
    public void update(String message) {
        receivedSms.add(message);
        System.out.println("💬 " + customerName + " 收到短信: " + message);
        // 模拟短信发送
        simulateSmsSending(message);
    }
    
    @Override
    public String getObserverName() {
        return customerName + " (短信)";
    }
    
    private void simulateSmsSending(String message) {
        try {
            Thread.sleep(120); // 模拟短信发送延迟
            System.out.println("✅ 短信已发送到: " + phoneNumber);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("❌ 短信发送中断");
        }
    }
    
    public List<String> getReceivedSms() {
        return new ArrayList<>(receivedSms);
    }
}

// 新闻发布系统管理器
public class NewsSystemManager {
    private Map<String, NewsPublisher> publishers;
    
    public NewsSystemManager() {
        this.publishers = new HashMap<>();
    }
    
    public NewsPublisher createPublisher(String name) {
        NewsPublisher publisher = new NewsPublisher(name);
        publishers.put(name, publisher);
        return publisher;
    }
    
    public void publishNewsToAll(String news) {
        System.out.println("\n=== 全局新闻发布 ===");
        for (NewsPublisher publisher : publishers.values()) {
            publisher.setNews(news);
        }
    }
    
    public void printSystemStatus() {
        System.out.println("\n=== 新闻系统状态 ===");
        System.out.println("发布器数量: " + publishers.size());
        
        for (NewsPublisher publisher : publishers.values()) {
            System.out.println("📰 " + publisher.getPublisherName() + 
                             " - 订阅者: " + publisher.getSubscriberCount() +
                             " - 最新新闻: " + publisher.getNews());
        }
    }
    
    public List<String> getAllPublisherNames() {
        return new ArrayList<>(publishers.keySet());
    }
}

// 客户端演示
public class ObserverPatternDemo {
    public static void main(String[] args) {
        System.out.println("=== 观察者模式演示:新闻发布系统 ===\n");
        
        NewsSystemManager systemManager = new NewsSystemManager();
        
        // 1. 创建新闻发布器
        System.out.println("1. 创建新闻发布器:");
        NewsPublisher techPublisher = systemManager.createPublisher("科技新闻");
        NewsPublisher sportsPublisher = systemManager.createPublisher("体育新闻");
        NewsPublisher financePublisher = systemManager.createPublisher("财经新闻");
        
        // 2. 创建订阅者
        System.out.println("\n2. 创建订阅者:");
        Observer emailSubscriber1 = new EmailSubscriber("张三", "zhangsan@email.com");
        Observer emailSubscriber2 = new EmailSubscriber("李四", "lisi@email.com");
        Observer pushSubscriber1 = new PushSubscriber("王五", "device_001");
        Observer webSubscriber1 = new WebSubscriber("新闻门户", "https://news.example.com");
        Observer smsSubscriber1 = new SmsSubscriber("赵六", "13800138000");
        
        // 3. 订阅新闻
        System.out.println("\n3. 订阅新闻:");
        techPublisher.registerObserver(emailSubscriber1);
        techPublisher.registerObserver(pushSubscriber1);
        techPublisher.registerObserver(webSubscriber1);
        
        sportsPublisher.registerObserver(emailSubscriber2);
        sportsPublisher.registerObserver(smsSubscriber1);
        
        financePublisher.registerObserver(emailSubscriber1);
        financePublisher.registerObserver(emailSubscriber2);
        financePublisher.registerObserver(pushSubscriber1);
        financePublisher.registerObserver(webSubscriber1);
        financePublisher.registerObserver(smsSubscriber1);
        
        // 4. 发布新闻
        System.out.println("\n4. 发布新闻:");
        techPublisher.setNews("人工智能取得重大突破!");
        sportsPublisher.setNews("中国女排夺得世界冠军!");
        financePublisher.setNews("股市今日大涨3.5%!");
        
        // 5. 取消订阅演示
        System.out.println("\n5. 取消订阅演示:");
        techPublisher.removeObserver(emailSubscriber1);
        techPublisher.setNews("新的科技产品发布会即将举行");
        
        // 6. 系统状态
        System.out.println("\n6. 系统状态:");
        systemManager.printSystemStatus();
        
        // 7. 全局新闻发布
        System.out.println("\n7. 全局新闻发布:");
        systemManager.publishNewsToAll("紧急通知:系统维护将于今晚进行");
        
        // 8. 性能测试
        System.out.println("\n8. 性能测试:");
        performanceTest();
    }
    
    private static void performanceTest() {
        NewsPublisher testPublisher = new NewsPublisher("测试发布器");
        
        // 创建大量观察者
        List<Observer> testObservers = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            testObservers.add(new EmailSubscriber("测试用户" + i, "test" + i + "@email.com"));
        }
        
        // 注册所有观察者
        for (Observer observer : testObservers) {
            testPublisher.registerObserver(observer);
        }
        
        // 测试通知性能
        long startTime = System.currentTimeMillis();
        testPublisher.setNews("性能测试新闻");
        long endTime = System.currentTimeMillis();
        
        System.out.println("通知 " + testObservers.size() + " 个观察者耗时: " + (endTime - startTime) + "ms");
    }
}

🏢 企业级应用:股票交易系统

复杂的观察者模式实现

// 股票数据类
public class StockData {
    private final String symbol;
    private double price;
    private double change;
    private double changePercent;
    private long volume;
    private LocalDateTime timestamp;
    
    public StockData(String symbol, double price, double change, double changePercent, long volume) {
        this.symbol = symbol;
        this.price = price;
        this.change = change;
        this.changePercent = changePercent;
        this.volume = volume;
        this.timestamp = LocalDateTime.now();
    }
    
    // Getters
    public String getSymbol() { return symbol; }
    public double getPrice() { return price; }
    public double getChange() { return change; }
    public double getChangePercent() { return changePercent; }
    public long getVolume() { return volume; }
    public LocalDateTime getTimestamp() { return timestamp; }
    
    @Override
    public String toString() {
        return String.format("%s: $%.2f (%.2f, %.2f%%) Vol: %,d", 
                           symbol, price, change, changePercent, volume);
    }
}

// 股票市场主题
public interface StockMarket {
    void registerObserver(StockObserver observer);
    void removeObserver(StockObserver observer);
    void notifyObservers(StockData stockData);
    void addStockData(StockData stockData);
    List<String> getSupportedSymbols();
}

// 股票观察者接口
public interface StockObserver {
    void update(StockData stockData);
    String getObserverId();
    Set<String> getInterestedSymbols();
    default boolean isInterestedIn(String symbol) {
        return getInterestedSymbols().contains(symbol);
    }
}

// 具体股票市场 - 实时数据源
public class RealTimeStockMarket implements StockMarket {
    private final List<StockObserver> observers;
    private final Map<String, StockData> latestData;
    private final String marketName;
    
    public RealTimeStockMarket(String marketName) {
        this.observers = new ArrayList<>();
        this.latestData = new ConcurrentHashMap<>();
        this.marketName = marketName;
        System.out.println("🏛️ 创建股票市场: " + marketName);
    }
    
    @Override
    public void registerObserver(StockObserver observer) {
        if (!observers.contains(observer)) {
            observers.add(observer);
            System.out.println("✅ " + observer.getObserverId() + " 注册到 " + marketName);
        }
    }
    
    @Override
    public void removeObserver(StockObserver observer) {
        if (observers.remove(observer)) {
            System.out.println("❌ " + observer.getObserverId() + " 从 " + marketName + " 取消注册");
        }
    }
    
    @Override
    public void notifyObservers(StockData stockData) {
        if (observers.isEmpty()) {
            return;
        }
        
        List<StockObserver> interestedObservers = observers.stream()
                .filter(observer -> observer.isInterestedIn(stockData.getSymbol()))
                .collect(Collectors.toList());
        
        if (interestedObservers.isEmpty()) {
            return;
        }
        
        System.out.println("🔔 " + marketName + " 通知 " + interestedObservers.size() + 
                         " 个观察者关于 " + stockData.getSymbol());
        
        for (StockObserver observer : interestedObservers) {
            observer.update(stockData);
        }
    }
    
    @Override
    public void addStockData(StockData stockData) {
        latestData.put(stockData.getSymbol(), stockData);
        System.out.println("📊 " + marketName + " 更新: " + stockData);
        notifyObservers(stockData);
    }
    
    @Override
    public List<String> getSupportedSymbols() {
        return new ArrayList<>(latestData.keySet());
    }
    
    public void simulateMarketData() {
        Random random = new Random();
        
        // 模拟一些热门股票
        String[] symbols = {"AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"};
        
        for (String symbol : symbols) {
            double basePrice = 100 + random.nextDouble() * 900;
            double change = (random.nextDouble() - 0.5) * 10;
            double changePercent = (change / basePrice) * 100;
            long volume = 1000000 + random.nextInt(9000000);
            
            StockData data = new StockData(symbol, basePrice, change, changePercent, volume);
            addStockData(data);
            
            // 模拟数据更新间隔
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    public StockData getLatestData(String symbol) {
        return latestData.get(symbol);
    }
    
    public int getObserverCount() {
        return observers.size();
    }
}

// 具体观察者 - 交易机器人
public class TradingBot implements StockObserver {
    private final String botId;
    private final Set<String> interestedSymbols;
    private final Map<String, List<StockData>> historicalData;
    private final double buyThreshold;
    private final double sellThreshold;
    
    public TradingBot(String botId, Set<String> symbols, double buyThreshold, double sellThreshold) {
        this.botId = botId;
        this.interestedSymbols = new HashSet<>(symbols);
        this.historicalData = new HashMap<>();
        this.buyThreshold = buyThreshold;
        this.sellThreshold = sellThreshold;
        System.out.println("🤖 创建交易机器人: " + botId);
    }
    
    @Override
    public void update(StockData stockData) {
        // 保存历史数据
        historicalData.computeIfAbsent(stockData.getSymbol(), k -> new ArrayList<>())
                     .add(stockData);
        
        // 分析数据并做出交易决策
        analyzeAndTrade(stockData);
    }
    
    @Override
    public String getObserverId() {
        return botId;
    }
    
    @Override
    public Set<String> getInterestedSymbols() {
        return new HashSet<>(interestedSymbols);
    }
    
    private void analyzeAndTrade(StockData stockData) {
        List<StockData> history = historicalData.get(stockData.getSymbol());
        if (history.size() < 2) {
            return; // 需要至少两个数据点进行分析
        }
        
        StockData previousData = history.get(history.size() - 2);
        double priceChange = stockData.getPrice() - previousData.getPrice();
        double changePercent = (priceChange / previousData.getPrice()) * 100;
        
        System.out.println("📈 " + botId + " 分析 " + stockData.getSymbol() + 
                         ": 变化 " + String.format("%.2f", changePercent) + "%");
        
        // 简单的交易策略
        if (changePercent <= buyThreshold) {
            System.out.println("💰 " + botId + " 决定买入 " + stockData.getSymbol() + 
                             " (价格下跌 " + String.format("%.2f", Math.abs(changePercent)) + "%)");
            executeBuyOrder(stockData);
        } else if (changePercent >= sellThreshold) {
            System.out.println("💸 " + botId + " 决定卖出 " + stockData.getSymbol() + 
                             " (价格上涨 " + String.format("%.2f", changePercent) + "%)");
            executeSellOrder(stockData);
        }
    }
    
    private void executeBuyOrder(StockData stockData) {
        // 模拟执行买入订单
        try {
            Thread.sleep(50);
            System.out.println("✅ " + botId + " 成功买入 " + stockData.getSymbol());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void executeSellOrder(StockData stockData) {
        // 模拟执行卖出订单
        try {
            Thread.sleep(50);
            System.out.println("✅ " + botId + " 成功卖出 " + stockData.getSymbol());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public Map<String, List<StockData>> getHistoricalData() {
        return new HashMap<>(historicalData);
    }
}

// 具体观察者 - 价格警报器
public class PriceAlert implements StockObserver {
    private final String alertId;
    private final Set<String> monitoredSymbols;
    private final Map<String, Double> priceThresholds;
    private final List<String> triggeredAlerts;
    
    public PriceAlert(String alertId, Map<String, Double> priceThresholds) {
        this.alertId = alertId;
        this.monitoredSymbols = new HashSet<>(priceThresholds.keySet());
        this.priceThresholds = new HashMap<>(priceThresholds);
        this.triggeredAlerts = new ArrayList<>();
        System.out.println("🚨 创建价格警报: " + alertId);
    }
    
    @Override
    public void update(StockData stockData) {
        Double threshold = priceThresholds.get(stockData.getSymbol());
        if (threshold != null) {
            checkPriceAlert(stockData, threshold);
        }
    }
    
    @Override
    public String getObserverId() {
        return alertId;
    }
    
    @Override
    public Set<String> getInterestedSymbols() {
        return new HashSet<>(monitoredSymbols);
    }
    
    private void checkPriceAlert(StockData stockData, double threshold) {
        if (stockData.getPrice() >= threshold) {
            String alertMessage = String.format("🚨 %s 价格警报: %s 达到 $%.2f (阈值: $%.2f)", 
                                             alertId, stockData.getSymbol(), 
                                             stockData.getPrice(), threshold);
            triggeredAlerts.add(alertMessage);
            System.out.println(alertMessage);
            sendAlertNotification(alertMessage);
        }
    }
    
    private void sendAlertNotification(String message) {
        // 模拟发送警报通知
        try {
            Thread.sleep(30);
            System.out.println("📢 警报通知已发送: " + message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public List<String> getTriggeredAlerts() {
        return new ArrayList<>(triggeredAlerts);
    }
}

// 具体观察者 - 数据记录器
public class DataLogger implements StockObserver {
    private final String loggerId;
    private final Set<String> loggedSymbols;
    private final Map<String, List<StockData>> allData;
    
    public DataLogger(String loggerId, Set<String> symbols) {
        this.loggerId = loggerId;
        this.loggedSymbols = new HashSet<>(symbols);
        this.allData = new HashMap<>();
        System.out.println("📝 创建数据记录器: " + loggerId);
    }
    
    @Override
    public void update(StockData stockData) {
        allData.computeIfAbsent(stockData.getSymbol(), k -> new ArrayList<>())
               .add(stockData);
        
        System.out.println("💾 " + loggerId + " 记录 " + stockData.getSymbol() + 
                         " 数据: " + stockData.getPrice());
    }
    
    @Override
    public String getObserverId() {
        return loggerId;
    }
    
    @Override
    public Set<String> getInterestedSymbols() {
        return new HashSet<>(loggedSymbols);
    }
    
    public void generateReport() {
        System.out.println("\n=== 数据记录器 " + loggerId + " 报告 ===");
        for (Map.Entry<String, List<StockData>> entry : allData.entrySet()) {
            String symbol = entry.getKey();
            List<StockData> dataList = entry.getValue();
            
            if (!dataList.isEmpty()) {
                double firstPrice = dataList.get(0).getPrice();
                double lastPrice = dataList.get(dataList.size() - 1).getPrice();
                double change = lastPrice - firstPrice;
                double changePercent = (change / firstPrice) * 100;
                
                System.out.printf("📊 %s: 起始$%.2f, 结束$%.2f, 变化%.2f (%.2f%%)%n",
                               symbol, firstPrice, lastPrice, change, changePercent);
            }
        }
    }
    
    public Map<String, List<StockData>> getAllData() {
        return new HashMap<>(allData);
    }
}

// 股票交易系统演示
public class StockTradingDemo {
    public static void main(String[] args) {
        System.out.println("=== 观察者模式演示:股票交易系统 ===\n");
        
        // 1. 创建股票市场
        RealTimeStockMarket nasdaq = new RealTimeStockMarket("纳斯达克");
        
        // 2. 创建交易机器人
        Set<String> bot1Symbols = Set.of("AAPL", "GOOGL", "TSLA");
        TradingBot aggressiveBot = new TradingBot("激进交易机器人", bot1Symbols, -2.0, 3.0);
        
        Set<String> bot2Symbols = Set.of("MSFT", "AMZN", "META");
        TradingBot conservativeBot = new TradingBot("保守交易机器人", bot2Symbols, -1.0, 2.0);
        
        // 3. 创建价格警报
        Map<String, Double> alertThresholds = Map.of(
            "AAPL", 150.0,
            "TSLA", 200.0,
            "AMZN", 130.0
        );
        PriceAlert highPriceAlert = new PriceAlert("高价警报", alertThresholds);
        
        // 4. 创建数据记录器
        Set<String> allSymbols = Set.of("AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META");
        DataLogger marketLogger = new DataLogger("市场数据记录器", allSymbols);
        
        // 5. 注册观察者
        nasdaq.registerObserver(aggressiveBot);
        nasdaq.registerObserver(conservativeBot);
        nasdaq.registerObserver(highPriceAlert);
        nasdaq.registerObserver(marketLogger);
        
        // 6. 模拟市场数据
        System.out.println("\n6. 模拟市场数据流:");
        nasdaq.simulateMarketData();
        
        // 7. 生成报告
        System.out.println("\n7. 系统报告:");
        marketLogger.generateReport();
        
        System.out.println("\n触发警报:");
        highPriceAlert.getTriggeredAlerts().forEach(System.out::println);
        
        // 8. 性能测试
        System.out.println("\n8. 大规模观察者测试:");
        largeScaleObserverTest();
    }
    
    private static void largeScaleObserverTest() {
        RealTimeStockMarket testMarket = new RealTimeStockMarket("测试市场");
        
        // 创建大量观察者
        List<StockObserver> testObservers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            Set<String> symbols = Set.of("TEST" + (i % 10));
            TradingBot bot = new TradingBot("测试机器人" + i, symbols, -1.0, 1.0);
            testObservers.add(bot);
            testMarket.registerObserver(bot);
        }
        
        // 性能测试
        long startTime = System.currentTimeMillis();
        
        // 发送大量数据更新
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            String symbol = "TEST" + (i % 10);
            StockData data = new StockData(symbol, 
                100 + random.nextDouble() * 100,
                (random.nextDouble() - 0.5) * 10,
                0, 1000000);
            testMarket.addStockData(data);
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("处理 100 个数据更新,50 个观察者,耗时: " + (endTime - startTime) + "ms");
    }
}

🔄 架构演进:从简单到复杂

第一代:基础观察者模式

«interface»
Subject
+registerObserver(Observer)
+removeObserver(Observer)
+notifyObservers()
ConcreteSubject
-state
+getState()
+setState()
«interface»
Observer
+update()
ConcreteObserver
-observerState
+update()

第二代:支持事件对象的观察者模式

// 事件对象基类
public abstract class Event {
    private final String eventType;
    private final long timestamp;
    private final Object source;
    
    public Event(String eventType, Object source) {
        this.eventType = eventType;
        this.source = source;
        this.timestamp = System.currentTimeMillis();
    }
    
    // Getters
    public String getEventType() { return eventType; }
    public long getTimestamp() { return timestamp; }
    public Object getSource() { return source; }
    
    @Override
    public String toString() {
        return String.format("Event[type=%s, time=%d, source=%s]", 
                           eventType, timestamp, source.getClass().getSimpleName());
    }
}

// 具体事件类
public class StockPriceEvent extends Event {
    private final StockData stockData;
    private final double previousPrice;
    
    public StockPriceEvent(Object source, StockData stockData, double previousPrice) {
        super("STOCK_PRICE_UPDATE", source);
        this.stockData = stockData;
        this.previousPrice = previousPrice;
    }
    
    public StockData getStockData() { return stockData; }
    public double getPreviousPrice() { return previousPrice; }
    public double getPriceChange() { return stockData.getPrice() - previousPrice; }
    public double getPriceChangePercent() { 
        return (getPriceChange() / previousPrice) * 100; 
    }
}

public class SystemAlertEvent extends Event {
    private final String alertLevel;
    private final String message;
    private final Map<String, Object> additionalData;
    
    public SystemAlertEvent(Object source, String alertLevel, String message) {
        super("SYSTEM_ALERT", source);
        this.alertLevel = alertLevel;
        this.message = message;
        this.additionalData = new HashMap<>();
    }
    
    public SystemAlertEvent(Object source, String alertLevel, String message, 
                           Map<String, Object> additionalData) {
        this(source, alertLevel, message);
        this.additionalData.putAll(additionalData);
    }
    
    // Getters
    public String getAlertLevel() { return alertLevel; }
    public String getMessage() { return message; }
    public Map<String, Object> getAdditionalData() { return new HashMap<>(additionalData); }
    
    public void addData(String key, Object value) {
        additionalData.put(key, value);
    }
}

// 泛型事件观察者接口
public interface EventObserver<T extends Event> {
    void onEvent(T event);
    String getObserverId();
    Class<T> getEventType();
    default boolean supportsEvent(Event event) {
        return getEventType().isInstance(event);
    }
}

// 事件总线 - 高级观察者模式实现
public class EventBus {
    private final Map<Class<? extends Event>, List<EventObserver<?>>> observers;
    private final String busName;
    private final boolean asyncProcessing;
    private final ExecutorService executor;
    
    public EventBus(String busName) {
        this(busName, false);
    }
    
    public EventBus(String busName, boolean asyncProcessing) {
        this.observers = new ConcurrentHashMap<>();
        this.busName = busName;
        this.asyncProcessing = asyncProcessing;
        this.executor = asyncProcessing ? 
            Executors.newCachedThreadPool() : 
            Executors.newSingleThreadExecutor();
        System.out.println("🚌 创建事件总线: " + busName + 
                         (asyncProcessing ? " (异步模式)" : " (同步模式)"));
    }
    
    // 注册观察者
    public <T extends Event> void registerObserver(EventObserver<T> observer) {
        Class<T> eventType = observer.getEventType();
        observers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
                .add(observer);
        System.out.println("✅ " + observer.getObserverId() + " 注册到 " + busName + 
                         " 监听 " + eventType.getSimpleName());
    }
    
    // 取消注册
    public <T extends Event> void unregisterObserver(EventObserver<T> observer) {
        Class<T> eventType = observer.getEventType();
        List<EventObserver<?>> eventObservers = observers.get(eventType);
        if (eventObservers != null && eventObservers.remove(observer)) {
            System.out.println("❌ " + observer.getObserverId() + " 从 " + busName + " 取消注册");
        }
    }
    
    // 发布事件
    public void publishEvent(Event event) {
        System.out.println("📤 " + busName + " 发布事件: " + event);
        
        List<EventObserver<?>> allObservers = observers.values().stream()
                .flatMap(List::stream)
                .collect(Collectors.toList());
        
        List<EventObserver<?>> supportedObservers = allObservers.stream()
                .filter(observer -> observer.supportsEvent(event))
                .collect(Collectors.toList());
        
        if (supportedObservers.isEmpty()) {
            System.out.println("ℹ️ 没有观察者处理此事件");
            return;
        }
        
        System.out.println("🔔 通知 " + supportedObservers.size() + " 个观察者");
        
        for (EventObserver<?> observer : supportedObservers) {
            if (asyncProcessing) {
                executor.submit(() -> processEvent(observer, event));
            } else {
                processEvent(observer, event);
            }
        }
    }
    
    @SuppressWarnings("unchecked")
    private <T extends Event> void processEvent(EventObserver<?> observer, Event event) {
        try {
            EventObserver<T> typedObserver = (EventObserver<T>) observer;
            typedObserver.onEvent((T) event);
        } catch (Exception e) {
            System.err.println("❌ 观察者 " + observer.getObserverId() + " 处理事件失败: " + e.getMessage());
        }
    }
    
    // 获取统计信息
    public void printBusStats() {
        System.out.println("\n=== 事件总线 " + busName + " 统计 ===");
        System.out.println("支持的事件类型: " + observers.size());
        
        observers.forEach((eventType, obsList) -> {
            System.out.println("  " + eventType.getSimpleName() + ": " + obsList.size() + " 个观察者");
        });
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        System.out.println("🛑 事件总线 " + busName + " 已关闭");
    }
}

// 具体事件观察者实现
public class StockPriceAnalyzer implements EventObserver<StockPriceEvent> {
    private final String analyzerId;
    private final List<StockPriceEvent> processedEvents;
    
    public StockPriceAnalyzer(String analyzerId) {
        this.analyzerId = analyzerId;
        this.processedEvents = new ArrayList<>();
        System.out.println("📊 创建股价分析器: " + analyzerId);
    }
    
    @Override
    public void onEvent(StockPriceEvent event) {
        processedEvents.add(event);
        StockData stockData = event.getStockData();
        
        System.out.println("📈 " + analyzerId + " 分析 " + stockData.getSymbol() + 
                         ": 价格 " + stockData.getPrice() + 
                         ", 变化 " + String.format("%.2f", event.getPriceChangePercent()) + "%");
        
        // 进行复杂的分析
        performAdvancedAnalysis(event);
    }
    
    @Override
    public String getObserverId() {
        return analyzerId;
    }
    
    @Override
    public Class<StockPriceEvent> getEventType() {
        return StockPriceEvent.class;
    }
    
    private void performAdvancedAnalysis(StockPriceEvent event) {
        // 模拟复杂分析
        try {
            Thread.sleep(20);
            if (Math.abs(event.getPriceChangePercent()) > 5.0) {
                System.out.println("⚠️ " + analyzerId + " 检测到大幅波动: " + 
                                 event.getStockData().getSymbol());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public List<StockPriceEvent> getProcessedEvents() {
        return new ArrayList<>(processedEvents);
    }
}

public class SystemMonitor implements EventObserver<SystemAlertEvent> {
    private final String monitorId;
    private final List<SystemAlertEvent> alerts;
    
    public SystemMonitor(String monitorId) {
        this.monitorId = monitorId;
        this.alerts = new ArrayList<>();
        System.out.println("🖥️ 创建系统监控器: " + monitorId);
    }
    
    @Override
    public void onEvent(SystemAlertEvent event) {
        alerts.add(event);
        
        System.out.println("🚨 " + monitorId + " 收到系统警报: " + 
                         event.getAlertLevel() + " - " + event.getMessage());
        
        // 根据警报级别采取不同措施
        handleAlert(event);
    }
    
    @Override
    public String getObserverId() {
        return monitorId;
    }
    
    @Override
    public Class<SystemAlertEvent> getEventType() {
        return SystemAlertEvent.class;
    }
    
    private void handleAlert(SystemAlertEvent event) {
        switch (event.getAlertLevel()) {
            case "CRITICAL":
                System.out.println("🔴 紧急处理关键警报!");
                // 发送紧急通知等
                break;
            case "WARNING":
                System.out.println("🟡 处理警告级别警报");
                // 记录日志等
                break;
            case "INFO":
                System.out.println("🔵 记录信息级别警报");
                break;
        }
    }
    
    public List<SystemAlertEvent> getAlerts() {
        return new ArrayList<>(alerts);
    }
}

// 事件总线演示
public class EventBusDemo {
    public static void main(String[] args) {
        System.out.println("=== 事件总线观察者模式演示 ===\n");
        
        // 1. 创建事件总线(异步模式)
        EventBus eventBus = new EventBus("主事件总线", true);
        
        // 2. 创建事件观察者
        StockPriceAnalyzer priceAnalyzer1 = new StockPriceAnalyzer("技术分析器");
        StockPriceAnalyzer priceAnalyzer2 = new StockPriceAnalyzer("基本面分析器");
        SystemMonitor systemMonitor = new SystemMonitor("系统监控中心");
        
        // 3. 注册观察者
        eventBus.registerObserver(priceAnalyzer1);
        eventBus.registerObserver(priceAnalyzer2);
        eventBus.registerObserver(systemMonitor);
        
        // 4. 发布事件
        System.out.println("\n4. 发布股价事件:");
        RealTimeStockMarket market = new RealTimeStockMarket("测试市场");
        
        StockData appleStock = new StockData("AAPL", 150.0, 2.5, 1.67, 10000000);
        StockPriceEvent priceEvent = new StockPriceEvent(market, appleStock, 147.5);
        eventBus.publishEvent(priceEvent);
        
        StockData teslaStock = new StockData("TSLA", 210.0, -5.0, -2.33, 8000000);
        StockPriceEvent priceEvent2 = new StockPriceEvent(market, teslaStock, 215.0);
        eventBus.publishEvent(priceEvent2);
        
        // 5. 发布系统警报事件
        System.out.println("\n5. 发布系统警报事件:");
        SystemAlertEvent criticalAlert = new SystemAlertEvent(
            eventBus, "CRITICAL", "数据库连接失败", 
            Map.of("retryCount", 3, "timeout", 5000)
        );
        eventBus.publishEvent(criticalAlert);
        
        SystemAlertEvent infoAlert = new SystemAlertEvent(
            eventBus, "INFO", "系统启动完成"
        );
        eventBus.publishEvent(infoAlert);
        
        // 6. 总线统计
        System.out.println("\n6. 事件总线统计:");
        eventBus.printBusStats();
        
        // 7. 等待异步处理完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 8. 清理资源
        eventBus.shutdown();
        
        // 9. 性能测试
        System.out.println("\n9. 事件总线性能测试:");
        eventBusPerformanceTest();
    }
    
    private static void eventBusPerformanceTest() {
        EventBus performanceBus = new EventBus("性能测试总线", true);
        
        // 创建大量观察者
        List<EventObserver<StockPriceEvent>> testObservers = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            StockPriceAnalyzer analyzer = new StockPriceAnalyzer("性能测试分析器" + i);
            testObservers.add(analyzer);
            performanceBus.registerObserver(analyzer);
        }
        
        // 性能测试
        long startTime = System.currentTimeMillis();
        
        RealTimeStockMarket market = new RealTimeStockMarket("性能测试市场");
        Random random = new Random();
        
        for (int i = 0; i < 1000; i++) {
            String symbol = "STOCK" + (i % 10);
            StockData stockData = new StockData(symbol, 
                100 + random.nextDouble() * 100,
                (random.nextDouble() - 0.5) * 10,
                0, 1000000);
            
            StockPriceEvent event = new StockPriceEvent(market, stockData, 100.0);
            performanceBus.publishEvent(event);
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("发布 1000 个事件,100 个观察者,耗时: " + (endTime - startTime) + "ms");
        
        performanceBus.shutdown();
    }
}

第三代:响应式观察者模式

// 响应式流接口
public interface ReactiveStream<T> {
    void subscribe(ReactiveObserver<T> observer);
    void unsubscribe(ReactiveObserver<T> observer);
    void onNext(T item);
    void onError(Throwable error);
    void onComplete();
}

// 响应式观察者接口
public interface ReactiveObserver<T> {
    void onNext(T item);
    void onError(Throwable error);
    void onComplete();
}

// 响应式主题实现
public class ReactiveSubject<T> implements ReactiveStream<T> {
    private final List<ReactiveObserver<T>> observers;
    private final String subjectName;
    private boolean completed;
    
    public ReactiveSubject(String subjectName) {
        this.observers = new CopyOnWriteArrayList<>();
        this.subjectName = subjectName;
        this.completed = false;
        System.out.println("⚡ 创建响应式主题: " + subjectName);
    }
    
    @Override
    public void subscribe(ReactiveObserver<T> observer) {
        if (!observers.contains(observer)) {
            observers.add(observer);
            System.out.println("✅ " + observer.getClass().getSimpleName() + " 订阅 " + subjectName);
        }
    }
    
    @Override
    public void unsubscribe(ReactiveObserver<T> observer) {
        if (observers.remove(observer)) {
            System.out.println("❌ " + observer.getClass().getSimpleName() + " 取消订阅 " + subjectName);
        }
    }
    
    @Override
    public void onNext(T item) {
        if (completed) {
            throw new IllegalStateException("流已结束");
        }
        
        System.out.println("📥 " + subjectName + " 发布数据: " + item);
        
        for (ReactiveObserver<T> observer : observers) {
            try {
                observer.onNext(item);
            } catch (Exception e) {
                System.err.println("❌ 观察者处理数据失败: " + e.getMessage());
            }
        }
    }
    
    @Override
    public void onError(Throwable error) {
        System.out.println("💥 " + subjectName + " 发布错误: " + error.getMessage());
        
        for (ReactiveObserver<T> observer : observers) {
            try {
                observer.onError(error);
            } catch (Exception e) {
                System.err.println("❌ 观察者处理错误失败: " + e.getMessage());
            }
        }
    }
    
    @Override
    public void onComplete() {
        System.out.println("🏁 " + subjectName + " 完成数据流");
        completed = true;
        
        for (ReactiveObserver<T> observer : observers) {
            try {
                observer.onComplete();
            } catch (Exception e) {
                System.err.println("❌ 观察者处理完成通知失败: " + e.getMessage());
            }
        }
    }
    
    public int getSubscriberCount() {
        return observers.size();
    }
}

// 响应式操作符
public class ReactiveOperators {
    
    // 过滤操作符
    public static <T> ReactiveStream<T> filter(ReactiveStream<T> source, Predicate<T> predicate) {
        ReactiveSubject<T> result = new ReactiveSubject<>("filter");
        
        source.subscribe(new ReactiveObserver<T>() {
            @Override
            public void onNext(T item) {
                if (predicate.test(item)) {
                    result.onNext(item);
                }
            }
            
            @Override
            public void onError(Throwable error) {
                result.onError(error);
            }
            
            @Override
            public void onComplete() {
                result.onComplete();
            }
        });
        
        return result;
    }
    
    // 映射操作符
    public static <T, R> ReactiveStream<R> map(ReactiveStream<T> source, Function<T, R> mapper) {
        ReactiveSubject<R> result = new ReactiveSubject<>("map");
        
        source.subscribe(new ReactiveObserver<T>() {
            @Override
            public void onNext(T item) {
                result.onNext(mapper.apply(item));
            }
            
            @Override
            public void onError(Throwable error) {
                result.onError(error);
            }
            
            @Override
            public void onComplete() {
                result.onComplete();
            }
        });
        
        return result;
    }
    
    // 缓冲操作符
    public static <T> ReactiveStream<List<T>> buffer(ReactiveStream<T> source, int bufferSize) {
        ReactiveSubject<List<T>> result = new ReactiveSubject<>("buffer");
        List<T> buffer = new ArrayList<>();
        
        source.subscribe(new ReactiveObserver<T>() {
            @Override
            public void onNext(T item) {
                buffer.add(item);
                if (buffer.size() >= bufferSize) {
                    result.onNext(new ArrayList<>(buffer));
                    buffer.clear();
                }
            }
            
            @Override
            public void onError(Throwable error) {
                if (!buffer.isEmpty()) {
                    result.onNext(new ArrayList<>(buffer));
                }
                result.onError(error);
            }
            
            @Override
            public void onComplete() {
                if (!buffer.isEmpty()) {
                    result.onNext(new ArrayList<>(buffer));
                }
                result.onComplete();
            }
        });
        
        return result;
    }
}

// 具体响应式观察者
public class StockPriceProcessor implements ReactiveObserver<StockData> {
    private final String processorId;
    private final List<StockData> processedData;
    
    public StockPriceProcessor(String processorId) {
        this.processorId = processorId;
        this.processedData = new ArrayList<>();
        System.out.println("🔧 创建股价处理器: " + processorId);
    }
    
    @Override
    public void onNext(StockData stockData) {
        processedData.add(stockData);
        System.out.println("🔄 " + processorId + " 处理: " + stockData.getSymbol() + 
                         " @ $" + stockData.getPrice());
        
        // 模拟处理逻辑
        processStockData(stockData);
    }
    
    @Override
    public void onError(Throwable error) {
        System.out.println("❌ " + processorId + " 遇到错误: " + error.getMessage());
    }
    
    @Override
    public void onComplete() {
        System.out.println("✅ " + processorId + " 处理完成,共处理 " + 
                         processedData.size() + " 条数据");
        generateSummary();
    }
    
    private void processStockData(StockData stockData) {
        // 模拟复杂处理
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateSummary() {
        if (processedData.isEmpty()) return;
        
        Map<String, List<StockData>> bySymbol = processedData.stream()
                .collect(Collectors.groupingBy(StockData::getSymbol));
        
        System.out.println("\n📈 " + processorId + " 处理摘要:");
        bySymbol.forEach((symbol, data) -> {
            double avgPrice = data.stream()
                    .mapToDouble(StockData::getPrice)
                    .average()
                    .orElse(0.0);
            System.out.printf("  %s: 平均价格 $%.2f (%d 条记录)%n", 
                            symbol, avgPrice, data.size());
        });
    }
    
    public List<StockData> getProcessedData() {
        return new ArrayList<>(processedData);
    }
}

// 响应式系统演示
public class ReactiveObserverDemo {
    public static void main(String[] args) {
        System.out.println("=== 响应式观察者模式演示 ===\n");
        
        // 1. 创建响应式主题
        ReactiveSubject<StockData> stockStream = new ReactiveSubject<>("股价数据流");
        
        // 2. 创建处理器
        StockPriceProcessor processor1 = new StockPriceProcessor("实时处理器");
        StockPriceProcessor processor2 = new StockPriceProcessor("批处理处理器");
        
        // 3. 应用响应式操作符
        System.out.println("\n3. 应用响应式操作符:");
        
        // 过滤高价股票
        ReactiveStream<StockData> highPriceStream = ReactiveOperators.filter(
            stockStream, data -> data.getPrice() > 150.0
        );
        
        // 映射到价格变化
        ReactiveStream<Double> priceChangeStream = ReactiveOperators.map(
            stockStream, StockData::getChangePercent
        );
        
        // 缓冲处理
        ReactiveStream<List<StockData>> bufferedStream = ReactiveOperators.buffer(
            stockStream, 5
        );
        
        // 4. 订阅处理流
        stockStream.subscribe(processor1);
        highPriceStream.subscribe(new StockPriceProcessor("高价股票处理器"));
        bufferedStream.subscribe(new ReactiveObserver<List<StockData>>() {
            @Override
            public void onNext(List<StockData> batch) {
                System.out.println("📦 批处理: " + batch.size() + " 条记录");
                double avgPrice = batch.stream()
                        .mapToDouble(StockData::getPrice)
                        .average()
                        .orElse(0.0);
                System.out.println("   平均价格: $" + String.format("%.2f", avgPrice));
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("❌ 批处理错误: " + error.getMessage());
            }
            
            @Override
            public void onComplete() {
                System.out.println("✅ 批处理完成");
            }
        });
        
        // 5. 发布数据
        System.out.println("\n5. 发布股价数据:");
        Random random = new Random();
        String[] symbols = {"AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"};
        
        for (int i = 0; i < 20; i++) {
            String symbol = symbols[i % symbols.length];
            double price = 100 + random.nextDouble() * 100;
            double change = (random.nextDouble() - 0.5) * 10;
            double changePercent = (change / price) * 100;
            long volume = 1000000 + random.nextInt(9000000);
            
            StockData stockData = new StockData(symbol, price, change, changePercent, volume);
            stockStream.onNext(stockData);
            
            // 模拟数据间隔
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        // 6. 完成数据流
        stockStream.onComplete();
        
        // 7. 复杂流处理演示
        System.out.println("\n7. 复杂流处理演示:");
        complexStreamProcessing();
    }
    
    private static void complexStreamProcessing() {
        ReactiveSubject<Integer> numberStream = new ReactiveSubject<>("数字流");
        
        // 创建复杂处理管道
        ReactiveStream<String> resultStream = ReactiveOperators.map(
            ReactiveOperators.filter(
                ReactiveOperators.buffer(numberStream, 3),
                list -> list.stream().mapToInt(Integer::intValue).sum() > 10
            ),
            list -> "批次: " + list + ", 总和: " + list.stream().mapToInt(Integer::intValue).sum()
        );
        
        resultStream.subscribe(new ReactiveObserver<String>() {
            @Override
            public void onNext(String item) {
                System.out.println("📊 " + item);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("❌ 处理错误: " + error.getMessage());
            }
            
            @Override
            public void onComplete() {
                System.out.println("✅ 复杂流处理完成");
            }
        });
        
        // 发布数字
        for (int i = 1; i <= 10; i++) {
            numberStream.onNext(i);
        }
        
        numberStream.onComplete();
    }
}

⚡ 性能优化与缓存策略

高性能观察者模式

// 高性能观察者注册表
public class HighPerformanceObserverRegistry<T> {
    private final Map<Class<?>, CopyOnWriteArrayList<EventObserver<?>>> observerMap;
    private final ExecutorService executor;
    private final MetricsCollector metrics;
    
    public HighPerformanceObserverRegistry() {
        this.observerMap = new ConcurrentHashMap<>();
        this.executor = Executors.newWorkStealingPool();
        this.metrics = new MetricsCollector();
        System.out.println("⚡ 创建高性能观察者注册表");
    }
    
    public <E extends T> void register(Class<E> eventType, EventObserver<E> observer) {
        observerMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
                  .add(observer);
        metrics.recordRegistration(eventType, observer.getObserverId());
    }
    
    public <E extends T> void unregister(Class<E> eventType, EventObserver<E> observer) {
        List<EventObserver<?>> observers = observerMap.get(eventType);
        if (observers != null) {
            observers.remove(observer);
            metrics.recordUnregistration(eventType, observer.getObserverId());
        }
    }
    
    @SuppressWarnings("unchecked")
    public <E extends T> void publish(E event) {
        Class<?> eventType = event.getClass();
        List<EventObserver<?>> observers = observerMap.get(eventType);
        
        if (observers == null || observers.isEmpty()) {
            return;
        }
        
        long startTime = System.nanoTime();
        int observerCount = observers.size();
        
        // 并行处理事件
        List<CompletableFuture<Void>> futures = observers.stream()
                .map(observer -> CompletableFuture.runAsync(() -> {
                    try {
                        EventObserver<E> typedObserver = (EventObserver<E>) observer;
                        typedObserver.onEvent(event);
                        metrics.recordSuccess(eventType, observer.getObserverId());
                    } catch (Exception e) {
                        metrics.recordFailure(eventType, observer.getObserverId(), e);
                    }
                }, executor))
                .collect(Collectors.toList());
        
        // 等待所有处理完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenRun(() -> {
                    long duration = System.nanoTime() - startTime;
                    metrics.recordPublish(eventType, observerCount, duration);
                });
    }
    
    public void printMetrics() {
        metrics.printReport();
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    // 性能指标收集器
    private static class MetricsCollector {
        private final Map<Class<?>, EventMetrics> eventMetrics;
        private final Map<String, ObserverMetrics> observerMetrics;
        
        public MetricsCollector() {
            this.eventMetrics = new ConcurrentHashMap<>();
            this.observerMetrics = new ConcurrentHashMap<>();
        }
        
        public void recordRegistration(Class<?> eventType, String observerId) {
            eventMetrics.computeIfAbsent(eventType, k -> new EventMetrics());
            observerMetrics.computeIfAbsent(observerId, k -> new ObserverMetrics());
        }
        
        public void recordUnregistration(Class<?> eventType, String observerId) {
            // 记录注销统计
        }
        
        public void recordPublish(Class<?> eventType, int observerCount, long duration) {
            EventMetrics metrics = eventMetrics.get(eventType);
            if (metrics != null) {
                metrics.recordPublish(observerCount, duration);
            }
        }
        
        public void recordSuccess(Class<?> eventType, String observerId) {
            ObserverMetrics metrics = observerMetrics.get(observerId);
            if (metrics != null) {
                metrics.recordSuccess();
            }
        }
        
        public void recordFailure(Class<?> eventType, String observerId, Exception error) {
            ObserverMetrics metrics = observerMetrics.get(observerId);
            if (metrics != null) {
                metrics.recordFailure(error);
            }
        }
        
        public void printReport() {
            System.out.println("\n=== 高性能观察者系统指标 ===");
            
            System.out.println("\n事件类型统计:");
            eventMetrics.forEach((eventType, metrics) -> {
                System.out.printf("  %s: 发布%d次, 平均观察者%.1f, 平均耗时%.2fms%n",
                               eventType.getSimpleName(),
                               metrics.publishCount,
                               metrics.getAverageObservers(),
                               metrics.getAverageDurationMs());
            });
            
            System.out.println("\n观察者统计:");
            observerMetrics.forEach((observerId, metrics) -> {
                System.out.printf("  %s: 成功%d次, 失败%d次, 成功率%.1f%%%n",
                               observerId,
                               metrics.successCount,
                               metrics.failureCount,
                               metrics.getSuccessRate());
            });
        }
    }
    
    private static class EventMetrics {
        private int publishCount;
        private long totalObservers;
        private long totalDuration;
        
        public void recordPublish(int observerCount, long duration) {
            publishCount++;
            totalObservers += observerCount;
            totalDuration += duration;
        }
        
        public double getAverageObservers() {
            return publishCount > 0 ? (double) totalObservers / publishCount : 0;
        }
        
        public double getAverageDurationMs() {
            return publishCount > 0 ? (totalDuration / 1_000_000.0) / publishCount : 0;
        }
    }
    
    private static class ObserverMetrics {
        private int successCount;
        private int failureCount;
        
        public void recordSuccess() {
            successCount++;
        }
        
        public void recordFailure(Exception error) {
            failureCount++;
        }
        
        public double getSuccessRate() {
            int total = successCount + failureCount;
            return total > 0 ? (successCount * 100.0) / total : 100.0;
        }
    }
}

// 高性能系统演示
public class HighPerformanceObserverDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 高性能观察者模式演示 ===\n");
        
        HighPerformanceObserverRegistry<Event> registry = new HighPerformanceObserverRegistry<>();
        
        // 创建大量观察者
        List<EventObserver<StockPriceEvent>> stockObservers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            StockPriceAnalyzer analyzer = new StockPriceAnalyzer("高性能分析器" + i);
            stockObservers.add(analyzer);
            registry.register(StockPriceEvent.class, analyzer);
        }
        
        List<EventObserver<SystemAlertEvent>> alertObservers = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            SystemMonitor monitor = new SystemMonitor("高性能监控器" + i);
            alertObservers.add(monitor);
            registry.register(SystemAlertEvent.class, monitor);
        }
        
        // 发布大量事件
        System.out.println("发布事件...");
        RealTimeStockMarket market = new RealTimeStockMarket("高性能市场");
        Random random = new Random();
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < 1000; i++) {
            String symbol = "STOCK" + (i % 20);
            StockData stockData = new StockData(symbol, 
                100 + random.nextDouble() * 100,
                (random.nextDouble() - 0.5) * 10,
                0, 1000000);
            
            StockPriceEvent event = new StockPriceEvent(market, stockData, 100.0);
            registry.publish(event);
            
            // 偶尔发布系统警报
            if (i % 100 == 0) {
                SystemAlertEvent alert = new SystemAlertEvent(
                    registry, "INFO", "系统运行正常 - " + i
                );
                registry.publish(alert);
            }
        }
        
        // 等待处理完成
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        
        System.out.println("\n总执行时间: " + (endTime - startTime) + "ms");
        
        // 打印性能指标
        registry.printMetrics();
        
        // 清理资源
        registry.shutdown();
    }
}

🔧 测试策略与验证

观察者模式测试用例

class ObserverPatternTest {
    
    @Test
    void testNewsPublisherBasicOperations() {
        // 给定
        NewsPublisher publisher = new NewsPublisher("测试发布器");
        EmailSubscriber subscriber = new EmailSubscriber("测试用户", "test@email.com");
        
        // 当
        publisher.registerObserver(subscriber);
        publisher.setNews("测试新闻");
        
        // 然后
        assertEquals(1, publisher.getSubscriberCount());
        assertEquals(1, subscriber.getReceivedCount());
        assertTrue(subscriber.getReceivedNews().contains("测试新闻"));
    }
    
    @Test
    void testMultipleObservers() {
        // 给定
        NewsPublisher publisher = new NewsPublisher("测试发布器");
        EmailSubscriber emailSub = new EmailSubscriber("邮件用户", "email@test.com");
        PushSubscriber pushSub = new PushSubscriber("推送用户", "device123");
        
        // 当
        publisher.registerObserver(emailSub);
        publisher.registerObserver(pushSub);
        publisher.setNews("多用户新闻");
        
        // 然后
        assertEquals(2, publisher.getSubscriberCount());
        assertEquals(1, emailSub.getReceivedCount());
        assertEquals(1, pushSub.getReceivedNotifications().size());
    }
    
    @Test
    void testObserverRemoval() {
        // 给定
        NewsPublisher publisher = new NewsPublisher("测试发布器");
        EmailSubscriber subscriber = new EmailSubscriber("测试用户", "test@email.com");
        
        publisher.registerObserver(subscriber);
        publisher.setNews("第一条新闻");
        
        // 当
        publisher.removeObserver(subscriber);
        publisher.setNews("第二条新闻");
        
        // 然后
        assertEquals(0, publisher.getSubscriberCount());
        assertEquals(1, subscriber.getReceivedCount()); // 应该只收到第一条
    }
    
    @Test
    void testEventBusRegistration() {
        // 给定
        EventBus eventBus = new EventBus("测试总线");
        StockPriceAnalyzer analyzer = new StockPriceAnalyzer("测试分析器");
        
        // 当
        eventBus.registerObserver(analyzer);
        
        // 然后
        // 验证注册成功(主要通过控制台输出验证)
        assertDoesNotThrow(() -> eventBus.publishEvent(
            new StockPriceEvent(this, 
                new StockData("TEST", 100.0, 1.0, 1.0, 1000), 
                99.0)
        ));
    }
    
    @Test
    void testReactiveStreamOperations() {
        // 给定
        ReactiveSubject<Integer> numberStream = new ReactiveSubject<>("测试流");
        List<Integer> receivedNumbers = new ArrayList<>();
        
        numberStream.subscribe(new ReactiveObserver<Integer>() {
            @Override
            public void onNext(Integer item) {
                receivedNumbers.add(item);
            }
            
            @Override
            public void onError(Throwable error) {}
            
            @Override
            public void onComplete() {}
        });
        
        // 当
        numberStream.onNext(1);
        numberStream.onNext(2);
        numberStream.onNext(3);
        numberStream.onComplete();
        
        // 然后
        assertEquals(3, receivedNumbers.size());
        assertArrayEquals(new Integer[]{1, 2, 3}, receivedNumbers.toArray());
    }
    
    @Test
    void testHighPerformanceRegistry() throws InterruptedException {
        // 给定
        HighPerformanceObserverRegistry<Event> registry = new HighPerformanceObserverRegistry<>();
        List<String> processedEvents = new ArrayList<>();
        
        EventObserver<StockPriceEvent> observer = new EventObserver<StockPriceEvent>() {
            @Override
            public void onEvent(StockPriceEvent event) {
                processedEvents.add(event.getStockData().getSymbol());
            }
            
            @Override
            public String getObserverId() {
                return "测试观察者";
            }
            
            @Override
            public Class<StockPriceEvent> getEventType() {
                return StockPriceEvent.class;
            }
        };
        
        registry.register(StockPriceEvent.class, observer);
        
        // 当
        RealTimeStockMarket market = new RealTimeStockMarket("测试市场");
        for (int i = 0; i < 10; i++) {
            StockData stockData = new StockData("TEST" + i, 100.0, 1.0, 1.0, 1000);
            StockPriceEvent event = new StockPriceEvent(market, stockData, 99.0);
            registry.publish(event);
        }
        
        // 等待异步处理
        Thread.sleep(100);
        
        // 然后
        assertEquals(10, processedEvents.size());
        
        registry.shutdown();
    }
    
    @Test
    void testStockTradingBotLogic() {
        // 给定
        TradingBot bot = new TradingBot("测试机器人", Set.of("AAPL"), -2.0, 3.0);
        StockData stockData1 = new StockData("AAPL", 100.0, 0.0, 0.0, 1000);
        StockData stockData2 = new StockData("AAPL", 98.0, -2.0, -2.0, 1000); // 下跌2%
        
        // 当
        bot.update(stockData1); // 第一个数据点
        bot.update(stockData2); // 第二个数据点,触发买入条件
        
        // 然后
        // 验证交易逻辑(主要通过控制台输出验证)
        Map<String, List<StockData>> history = bot.getHistoricalData();
        assertEquals(2, history.get("AAPL").size());
    }
}

// Mock观察者用于测试
class MockObserver implements Observer {
    private String name;
    private List<String> receivedMessages;
    private boolean shouldThrowError;
    
    public MockObserver(String name) {
        this(name, false);
    }
    
    public MockObserver(String name, boolean shouldThrowError) {
        this.name = name;
        this.receivedMessages = new ArrayList<>();
        this.shouldThrowError = shouldThrowError;
    }
    
    @Override
    public void update(String message) {
        if (shouldThrowError) {
            throw new RuntimeException("模拟观察者错误");
        }
        receivedMessages.add(message);
    }
    
    @Override
    public String getObserverName() {
        return name;
    }
    
    public List<String> getReceivedMessages() {
        return receivedMessages;
    }
    
    public int getMessageCount() {
        return receivedMessages.size();
    }
}

// 性能测试
class ObserverPerformanceTest {
    
    @Test
    void testLargeScaleObserverPerformance() {
        // 给定
        NewsPublisher publisher = new NewsPublisher("性能测试发布器");
        List<MockObserver> observers = new ArrayList<>();
        
        for (int i = 0; i < 1000; i++) {
            observers.add(new MockObserver("观察者" + i));
        }
        
        // 当
        long startTime = System.nanoTime();
        
        // 注册所有观察者
        for (MockObserver observer : observers) {
            publisher.registerObserver(observer);
        }
        
        // 发布消息
        publisher.setNews("性能测试消息");
        
        long endTime = System.nanoTime();
        long duration = endTime - startTime;
        
        // 然后
        System.out.println("1000个观察者通知耗时: " + duration + "ns");
        assertTrue(duration > 0);
        
        // 验证所有观察者都收到了消息
        for (MockObserver observer : observers) {
            assertEquals(1, observer.getMessageCount());
        }
    }
}

🌐 现代架构中的观察者模式

Spring Boot中的观察者应用

// Spring事件类
public class SpringStockEvent extends ApplicationEvent {
    private final StockData stockData;
    
    public SpringStockEvent(Object source, StockData stockData) {
        super(source);
        this.stockData = stockData;
    }
    
    public StockData getStockData() {
        return stockData;
    }
}

// Spring事件监听器
@Component
public class SpringStockEventListener {
    private static final Logger logger = LoggerFactory.getLogger(SpringStockEventListener.class);
    
    @EventListener
    @Async
    public void handleStockPriceUpdate(SpringStockEvent event) {
        StockData stockData = event.getStockData();
        logger.info("📈 Spring处理股价更新: {} - ${}", 
                   stockData.getSymbol(), stockData.getPrice());
        
        // 模拟处理逻辑
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    @EventListener
    @Order(1)
    public void handleStockPriceUpdateSync(SpringStockEvent event) {
        logger.info("🔄 同步处理股价更新: {}", event.getStockData().getSymbol());
    }
}

// Spring配置
@Configuration
@EnableAsync
@EnableScheduling
public class SpringObserverConfiguration {
    
    @Bean
    public RealTimeStockMarket stockMarket() {
        return new RealTimeStockMarket("Spring股票市场");
    }
    
    @Bean
    public ApplicationEventMulticaster applicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
        eventMulticaster.setTaskExecutor(taskExecutor());
        return eventMulticaster;
    }
    
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("SpringEvent-");
        executor.initialize();
        return executor;
    }
}

// Spring服务
@Service
public class SpringStockService {
    private final ApplicationEventPublisher eventPublisher;
    private final RealTimeStockMarket stockMarket;
    
    public SpringStockService(ApplicationEventPublisher eventPublisher, 
                             RealTimeStockMarket stockMarket) {
        this.eventPublisher = eventPublisher;
        this.stockMarket = stockMarket;
        setupMarketObservers();
    }
    
    private void setupMarketObservers() {
        // 将Spring事件发布器注册为股票市场的观察者
        stockMarket.registerObserver(new StockObserver() {
            @Override
            public void update(StockData stockData) {
                SpringStockEvent event = new SpringStockEvent(this, stockData);
                eventPublisher.publishEvent(event);
            }
            
            @Override
            public String getObserverId() {
                return "Spring事件发布器";
            }
            
            @Override
            public Set<String> getInterestedSymbols() {
                return Set.of("AAPL", "GOOGL", "MSFT"); // 只关注这些股票
            }
        });
    }
    
    @Scheduled(fixedRate = 5000)
    public void simulateMarketData() {
        stockMarket.simulateMarketData();
    }
    
    public void addStockData(StockData stockData) {
        stockMarket.addStockData(stockData);
    }
}

// REST控制器
@RestController
@RequestMapping("/api/stocks")
public class SpringStockController {
    private final SpringStockService stockService;
    
    public SpringStockController(SpringStockService stockService) {
        this.stockService = stockService;
    }
    
    @PostMapping("/publish")
    public ResponseEntity<String> publishStockData(@RequestBody StockDataRequest request) {
        try {
            StockData stockData = new StockData(
                request.getSymbol(),
                request.getPrice(),
                request.getChange(),
                request.getChangePercent(),
                request.getVolume()
            );
            
            stockService.addStockData(stockData);
            return ResponseEntity.ok("股票数据发布成功");
        } catch (Exception e) {
            return ResponseEntity.badRequest().body("发布失败: " + e.getMessage());
        }
    }
    
    @GetMapping("/symbols")
    public ResponseEntity<List<String>> getSupportedSymbols() {
        // 返回支持的股票符号
        return ResponseEntity.ok(List.of("AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"));
    }
}

// 请求DTO
public class StockDataRequest {
    private String symbol;
    private double price;
    private double change;
    private double changePercent;
    private long volume;
    
    // Getters and Setters
    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public double getChange() { return change; }
    public void setChange(double change) { this.change = change; }
    public double getChangePercent() { return changePercent; }
    public void setChangePercent(double changePercent) { this.changePercent = changePercent; }
    public long getVolume() { return volume; }
    public void setVolume(long volume) { this.volume = volume; }
}

// 微服务事件发布
@Service
public class MicroserviceEventService {
    private final ApplicationEventPublisher eventPublisher;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public MicroserviceEventService(ApplicationEventPublisher eventPublisher,
                                   KafkaTemplate<String, Object> kafkaTemplate) {
        this.eventPublisher = eventPublisher;
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void publishLocalEvent(StockData stockData) {
        SpringStockEvent event = new SpringStockEvent(this, stockData);
        eventPublisher.publishEvent(event);
    }
    
    @Async
    public void publishDistributedEvent(StockData stockData) {
        // 发布到Kafka供其他微服务消费
        kafkaTemplate.send("stock-price-updates", stockData.getSymbol(), stockData)
                .addCallback(
                    result -> logger.info("✅ 分布式事件发布成功: {}", stockData.getSymbol()),
                    failure -> logger.error("❌ 分布式事件发布失败: {}", failure.getMessage())
                );
    }
}

// Kafka消费者(其他微服务)
@Component
public class StockPriceKafkaConsumer {
    
    @KafkaListener(topics = "stock-price-updates", groupId = "trading-service")
    public void consumeStockPriceUpdate(StockData stockData) {
        logger.info("📥 交易服务收到股价更新: {}", stockData);
        
        // 处理股价更新,触发交易逻辑等
        processStockUpdateForTrading(stockData);
    }
    
    private void processStockUpdateForTrading(StockData stockData) {
        // 交易逻辑处理
        logger.info("🔧 处理交易逻辑 for {}", stockData.getSymbol());
    }
}

📊 模式对比与选择指南

观察者模式 vs 其他行为型模式

维度 观察者模式 发布-订阅模式 中介者模式
目的 一对多依赖通知 解耦发布者和订阅者 集中控制对象交互
耦合度 松耦合 完全解耦 通过中介者耦合
通信方式 直接通知 通过消息代理 通过中介者
使用时机 对象状态变化需要通知其他对象 需要完全解耦的组件间通信 复杂对象交互需要集中管理

观察者模式变体对比

类型 优点 缺点 适用场景
经典观察者 简单直观,易于理解 紧耦合,观察者知道主题 简单的事件通知
事件总线 完全解耦,灵活性强 复杂度高,需要事件类 复杂系统,多组件通信
响应式流 支持流操作,组合性强 学习曲线陡峭 数据流处理,实时系统
Spring事件 与Spring生态集成好 依赖Spring框架 Spring应用内部事件

何时使用观察者模式?

  1. 状态变化通知:一个对象状态改变需要通知其他对象,且不知道具体有多少对象需要通知
  2. 事件驱动架构:需要构建事件驱动的系统架构
  3. 解耦需求:需要减少对象之间的耦合度
  4. 广播通信:需要一对多的通信机制
  5. 实时数据处理:需要处理实时数据流和事件流

架构决策流程图

需要对象间通知
需要完全解耦吗?
使用经典观察者模式
需要流式处理吗?
使用事件总线模式
使用Spring框架吗?
使用响应式流模式
使用Spring事件机制
实现完成

🎯 最佳实践与陷阱规避

✅ 最佳实践

  1. 明确生命周期:清晰定义观察者的注册和注销时机
  2. 避免内存泄漏:及时移除不再需要的观察者
  3. 考虑线程安全:在多线程环境中确保观察者列表的线程安全
  4. 使用合适的数据结构:根据观察者数量选择合适的集合类型
  5. 提供错误处理:观察者处理事件时的错误不应该影响其他观察者

❌ 常见陷阱

  1. 内存泄漏:观察者忘记注销导致无法被垃圾回收
  2. 性能问题:观察者数量过多导致通知性能下降
  3. 循环依赖:观察者和被观察者之间形成循环依赖
  4. 事件顺序问题:观察者处理事件的顺序依赖导致问题
  5. 并发修改:在通知过程中修改观察者列表

🔧 重构技巧

// 重构前:紧耦合的直接调用
public class StockMarket {
    private TradingService tradingService;
    private NotificationService notificationService;
    private ReportService reportService;
    
    public void updateStockPrice(String symbol, double price) {
        // 更新价格
        this.latestPrice = price;
        
        // 直接调用依赖服务
        tradingService.checkTradingOpportunity(symbol, price);
        notificationService.sendPriceAlert(symbol, price);
        reportService.recordPriceUpdate(symbol, price);
    }
}

// 重构后:使用观察者模式
public class RefactoredStockMarket {
    private List<StockObserver> observers = new ArrayList<>();
    private double latestPrice;
    
    public void updateStockPrice(String symbol, double price) {
        this.latestPrice = price;
        notifyObservers(symbol, price);
    }
    
    public void registerObserver(StockObserver observer) {
        observers.add(observer);
    }
    
    public void removeObserver(StockObserver observer) {
        observers.remove(observer);
    }
    
    private void notifyObservers(String symbol, double price) {
        StockData data = new StockData(symbol, price, 0, 0, 0);
        for (StockObserver observer : observers) {
            observer.update(data);
        }
    }
}

// 观察者接口和实现
public interface StockObserver {
    void update(StockData stockData);
}

@Component
public class TradingService implements StockObserver {
    @Override
    public void update(StockData stockData) {
        checkTradingOpportunity(stockData.getSymbol(), stockData.getPrice());
    }
    
    private void checkTradingOpportunity(String symbol, double price) {
        // 交易逻辑
    }
}

@Component 
public class NotificationService implements StockObserver {
    @Override
    public void update(StockData stockData) {
        sendPriceAlert(stockData.getSymbol(), stockData.getPrice());
    }
    
    private void sendPriceAlert(String symbol, double price) {
        // 通知逻辑
    }
}

💡 总结升华

观察者模式不仅仅是一种设计模式,更是一种事件驱动架构的哲学。它教会我们:

  • 解耦思维:通过抽象接口解耦观察者和被观察者
  • 事件思维:将系统行为建模为一系列事件和事件处理器
  • 响应式思维:构建对变化做出响应的系统
  • 发布-订阅思维:建立高效的消息分发机制

在现代软件开发中,观察者模式已经成为构建响应式、事件驱动系统的重要工具。从GUI事件处理到微服务通信,从实时数据处理到系统监控,观察者模式都在默默地提供着强大的事件处理能力。

掌握观察者模式,意味着你不仅学会了一种设计模式,更获得了一种构建响应式系统的思维方式。这种思维方式将帮助你在设计复杂系统时,能够识别出事件源和事件处理器,构建出松耦合、高内聚的系统架构。

记住:优秀的架构师能够看到系统中的事件流,观察者模式正是处理这些事件流的利器。选择观察者模式,就是选择构建响应式和可扩展的系统架构。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐