AI系统监控与可观测性实践:从三大支柱到智能运维的全链路解决方案

引言:AI时代下的系统监控新挑战

在数字化转型的浪潮中,AI系统已成为企业核心竞争力的重要组成部分。然而,与传统的软件系统相比,AI系统带来了独特的监控挑战:模型性能漂移、数据质量变化、计算资源消耗巨大等。根据Gartner的报告,超过60%的AI项目在生产环境中遭遇了可观测性问题,其中数据漂移模型衰减是最主要的挑战。

可观测性(Observability)不再仅仅是传统意义上的系统监控,而是一个多维度的洞察系统,它需要覆盖从基础设施到业务指标,从数据质量到模型性能的全方位监控。本文将深入探讨如何构建一个完整的AI系统监控与可观测性平台,涵盖指标(Metrics)、日志(Logs)、追踪(Tracing)三大支柱,并提供基于Python、Java和Vue的完整实现方案。

一、可观测性三大支柱:全栈监控体系设计

1.1 可观测性体系架构全景

现代AI系统的可观测性需要覆盖从数据输入到服务输出的全链路,下图展示了完整的监控体系架构:

渲染错误: Mermaid 渲染失败: Lexical error on line 2. Unrecognized text. ...aph TB subgraph “数据采集层” A1[业 ----------------------^

1.2 三大支柱的协同作用

支柱 核心作用 数据特点 典型工具 AI系统关注点
指标 (Metrics) 量化系统状态,趋势分析 数值型,时间序列 Prometheus, Grafana 模型性能、数据质量、资源利用率
日志 (Logs) 记录离散事件,问题诊断 文本型,结构化/非结构化 ELK, Loki 模型推理过程、异常行为、审计跟踪
追踪 (Tracing) 分析请求链路,性能剖析 请求上下文,调用关系 Jaeger, SkyWalking 端到端延迟、组件间依赖、瓶颈定位

二、指标监控体系:多维度指标采集与可视化

2.1 业务指标监控实现

业务指标反映了AI系统的业务价值,是衡量AI效果的关键维度。

2.1.1 Python业务指标采集
# business_metrics_collector.py
import time
import psutil
from prometheus_client import Counter, Gauge, Histogram, start_http_server
from datetime import datetime
import threading
import json

class BusinessMetricsCollector:
    def __init__(self):
        # 业务指标定义
        self.user_requests = Counter('ai_system_user_requests_total', 
                                     'Total user requests', ['service', 'endpoint'])
        self.user_clicks = Counter('ai_system_user_clicks_total',
                                   'Total user clicks', ['service', 'item_type'])
        self.conversion_rate = Gauge('ai_system_conversion_rate',
                                     'Conversion rate percentage', ['service'])
        self.user_retention = Gauge('ai_system_user_retention_rate',
                                    'User retention rate percentage', ['period'])
        
        # 业务指标计算辅助变量
        self.request_data = {}
        self.click_data = {}
        
    def record_request(self, service, endpoint, user_id, timestamp=None):
        """记录用户请求"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # 增加请求计数
        self.user_requests.labels(service=service, endpoint=endpoint).inc()
        
        # 记录请求详情(用于计算转化率等)
        key = f"{user_id}_{service}"
        if key not in self.request_data:
            self.request_data[key] = {
                'first_request': timestamp,
                'last_request': timestamp,
                'request_count': 1,
                'endpoints': {endpoint: 1}
            }
        else:
            self.request_data[key]['last_request'] = timestamp
            self.request_data[key]['request_count'] += 1
            self.request_data[key]['endpoints'][endpoint] = \
                self.request_data[key]['endpoints'].get(endpoint, 0) + 1
        
        # 计算并更新用户留存率
        self._calculate_user_retention()
    
    def record_click(self, service, item_type, user_id, item_id, timestamp=None):
        """记录用户点击"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # 增加点击计数
        self.user_clicks.labels(service=service, item_type=item_type).inc()
        
        # 记录点击详情
        key = f"{user_id}_{service}"
        if key not in self.click_data:
            self.click_data[key] = {
                'clicks': [],
                'clicked_items': set()
            }
        
        self.click_data[key]['clicks'].append({
            'timestamp': timestamp,
            'item_type': item_type,
            'item_id': item_id
        })
        self.click_data[key]['clicked_items'].add(item_id)
        
        # 计算转化率
        self._calculate_conversion_rate(service)
    
    def _calculate_conversion_rate(self, service):
        """计算转化率"""
        total_requests = 0
        total_clicks = 0
        
        for user_key in self.request_data:
            if service in user_key:
                total_requests += self.request_data[user_key]['request_count']
        
        for user_key in self.click_data:
            if service in user_key:
                total_clicks += len(self.click_data[user_key]['clicks'])
        
        if total_requests > 0:
            conversion_rate = (total_clicks / total_requests) * 100
            self.conversion_rate.labels(service=service).set(conversion_rate)
    
    def _calculate_user_retention(self):
        """计算用户留存率"""
        now = datetime.now()
        periods = {
            'daily': 1,
            'weekly': 7,
            'monthly': 30
        }
        
        for period_name, days in periods.items():
            # 计算活跃用户
            active_users = set()
            for user_key, data in self.request_data.items():
                time_diff = (now - data['last_request']).days
                if time_diff <= days:
                    user_id = user_key.split('_')[0]
                    active_users.add(user_id)
            
            # 计算总用户数
            total_users = len(set([key.split('_')[0] for key in self.request_data.keys()]))
            
            if total_users > 0:
                retention_rate = (len(active_users) / total_users) * 100
                self.user_retention.labels(period=period_name).set(retention_rate)
    
    def calculate_ctr(self, service, item_type=None):
        """计算点击通过率(CTR)"""
        # 获取展示次数(从请求数据中提取)
        impressions = 0
        for user_key, data in self.request_data.items():
            if service in user_key:
                # 假设每个请求都展示了特定类型的项目
                if item_type:
                    # 根据端点判断是否展示了该类型项目
                    for endpoint, count in data['endpoints'].items():
                        if item_type in endpoint:
                            impressions += count
                else:
                    impressions += data['request_count']
        
        # 获取点击次数
        clicks = 0
        for user_key, data in self.click_data.items():
            if service in user_key:
                if item_type:
                    clicks += len([c for c in data['clicks'] if c['item_type'] == item_type])
                else:
                    clicks += len(data['clicks'])
        
        if impressions > 0:
            ctr = (clicks / impressions) * 100
            return ctr
        return 0
    
    def start_metrics_server(self, port=8000):
        """启动指标服务器"""
        start_http_server(port)
        print(f"Business metrics server started on port {port}")

# 使用示例
if __name__ == "__main__":
    collector = BusinessMetricsCollector()
    
    # 模拟一些请求和点击
    collector.record_request('recommendation', '/api/recommend', 'user123')
    collector.record_click('recommendation', 'product', 'user123', 'prod456')
    
    # 启动指标服务器
    collector.start_metrics_server(8001)
    
    # 保持运行
    while True:
        time.sleep(60)
        # 定期计算和更新指标
        collector._calculate_conversion_rate('recommendation')
        collector._calculate_user_retention()
2.1.2 Java业务指标采集
// BusinessMetricsService.java
package com.ai.monitoring.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class BusinessMetricsService {
    
    private final MeterRegistry meterRegistry;
    
    // 业务指标
    private Counter totalUserRequests;
    private Counter totalUserClicks;
    private Gauge conversionRate;
    private Gauge userRetentionRate;
    
    // 业务数据存储
    private ConcurrentHashMap<String, UserRequestData> userRequestData;
    private ConcurrentHashMap<String, UserClickData> userClickData;
    
    // 内部数据结构
    private static class UserRequestData {
        LocalDateTime firstRequest;
        LocalDateTime lastRequest;
        AtomicInteger requestCount = new AtomicInteger(0);
        ConcurrentHashMap<String, AtomicInteger> endpoints = new ConcurrentHashMap<>();
    }
    
    private static class UserClickData {
        ConcurrentHashMap<String, AtomicInteger> clicksByType = new ConcurrentHashMap<>();
    }
    
    public BusinessMetricsService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.userRequestData = new ConcurrentHashMap<>();
        this.userClickData = new ConcurrentHashMap<>();
        
        // 初始化指标
        initializeMetrics();
    }
    
    private void initializeMetrics() {
        // 用户请求计数器
        totalUserRequests = Counter.builder("ai.system.user.requests.total")
                .description("Total user requests to AI system")
                .tag("system", "ai-recommendation")
                .register(meterRegistry);
        
        // 用户点击计数器
        totalUserClicks = Counter.builder("ai.system.user.clicks.total")
                .description("Total user clicks in AI system")
                .tag("system", "ai-recommendation")
                .register(meterRegistry);
        
        // 转化率指标
        conversionRate = Gauge.builder("ai.system.conversion.rate", 
                this, 
                service -> service.calculateConversionRate())
                .description("Conversion rate percentage")
                .tag("system", "ai-recommendation")
                .register(meterRegistry);
        
        // 用户留存率指标
        userRetentionRate = Gauge.builder("ai.system.user.retention.rate",
                this,
                service -> service.calculateUserRetention("daily"))
                .description("User retention rate")
                .tag("period", "daily")
                .register(meterRegistry);
    }
    
    public void recordUserRequest(String userId, String service, String endpoint) {
        // 记录请求
        totalUserRequests.increment();
        
        // 更新用户请求数据
        String userKey = userId + "_" + service;
        UserRequestData data = userRequestData.computeIfAbsent(userKey, 
                k -> new UserRequestData());
        
        LocalDateTime now = LocalDateTime.now();
        if (data.firstRequest == null) {
            data.firstRequest = now;
        }
        data.lastRequest = now;
        data.requestCount.incrementAndGet();
        
        // 记录端点调用
        data.endpoints.computeIfAbsent(endpoint, k -> new AtomicInteger(0))
                .incrementAndGet();
        
        // 更新用户留存率
        updateUserRetentionMetrics();
    }
    
    public void recordUserClick(String userId, String service, String itemType, String itemId) {
        // 记录点击
        totalUserClicks.increment();
        
        // 更新用户点击数据
        String userKey = userId + "_" + service;
        UserClickData data = userClickData.computeIfAbsent(userKey, 
                k -> new UserClickData());
        
        // 按类型记录点击
        data.clicksByType.computeIfAbsent(itemType, k -> new AtomicInteger(0))
                .incrementAndGet();
        
        // 更新转化率
        updateConversionMetrics();
    }
    
    private double calculateConversionRate() {
        long totalRequests = userRequestData.values().stream()
                .mapToInt(data -> data.requestCount.get())
                .sum();
        
        long totalClicks = userClickData.values().stream()
                .flatMap(data -> data.clicksByType.values().stream())
                .mapToInt(AtomicInteger::get)
                .sum();
        
        if (totalRequests > 0) {
            return (double) totalClicks / totalRequests * 100;
        }
        return 0.0;
    }
    
    private double calculateUserRetention(String period) {
        LocalDateTime now = LocalDateTime.now();
        long daysThreshold = getDaysThreshold(period);
        
        long activeUsers = userRequestData.entrySet().stream()
                .filter(entry -> {
                    LocalDateTime lastRequest = entry.getValue().lastRequest;
                    if (lastRequest == null) return false;
                    
                    long daysBetween = java.time.Duration.between(lastRequest, now).toDays();
                    return daysBetween <= daysThreshold;
                })
                .map(entry -> entry.getKey().split("_")[0]) // 提取用户ID
                .distinct()
                .count();
        
        long totalUsers = userRequestData.keySet().stream()
                .map(key -> key.split("_")[0])
                .distinct()
                .count();
        
        if (totalUsers > 0) {
            return (double) activeUsers / totalUsers * 100;
        }
        return 0.0;
    }
    
    private long getDaysThreshold(String period) {
        switch (period.toLowerCase()) {
            case "daily": return 1;
            case "weekly": return 7;
            case "monthly": return 30;
            default: return 1;
        }
    }
    
    private void updateConversionMetrics() {
        // 转化率指标会自动更新,这里可以添加其他逻辑
    }
    
    private void updateUserRetentionMetrics() {
        // 用户留存率指标会自动更新,这里可以添加其他逻辑
    }
    
    public double calculateCTR(String service, String itemType) {
        long impressions = userRequestData.entrySet().stream()
                .filter(entry -> entry.getKey().contains(service))
                .flatMap(entry -> entry.getValue().endpoints.entrySet().stream())
                .filter(entry -> itemType == null || entry.getKey().contains(itemType))
                .mapToLong(entry -> entry.getValue().get())
                .sum();
        
        long clicks = userClickData.entrySet().stream()
                .filter(entry -> entry.getKey().contains(service))
                .flatMap(entry -> entry.getValue().clicksByType.entrySet().stream())
                .filter(entry -> itemType == null || entry.getKey().equals(itemType))
                .mapToLong(entry -> entry.getValue().get())
                .sum();
        
        if (impressions > 0) {
            return (double) clicks / impressions * 100;
        }
        return 0.0;
    }
}

2.2 模型指标监控实现

模型指标是AI系统特有的监控维度,关注模型性能和数据质量的变化。

2.2.1 使用Evidently AI监控模型性能
# model_metrics_monitor.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import json
from prometheus_client import Gauge, Histogram, Counter

from evidently import ColumnMapping
from evidently.metrics import (
    ClassificationQualityMetric,
    DataDriftTable,
    DataQualityMetric,
    DatasetDriftMetric,
    ClassificationQualityByFeatureTable,
    ClassificationConfusionMatrix,
)
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    ClassificationPreset
)
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently.tests import (
    TestNumberOfRows,
    TestNumberOfColumns,
    TestColumnsType,
    TestTargetPredictionCorrelation,
    TestClassificationQualityByClass,
    TestFeatureValueDrift,
    TestShareOfDriftedFeatures,
    TestAccuracyScore,
    TestPrecisionScore,
    TestRecallScore,
    TestF1Score,
    TestRocAuc
)

@dataclass
class ModelMetrics:
    accuracy: float
    precision: float
    recall: float
    f1_score: float
    roc_auc: float
    psi_score: float
    data_drift_detected: bool
    concept_drift_detected: bool
    drift_features: List[str]
    
class ModelMetricsMonitor:
    def __init__(self, reference_data: pd.DataFrame, prometheus_enabled: bool = True):
        """
        初始化模型指标监控器
        
        Args:
            reference_data: 参考数据集(训练数据或黄金标准数据)
            prometheus_enabled: 是否启用Prometheus指标导出
        """
        self.reference_data = reference_data
        self.current_data = None
        self.column_mapping = None
        
        # 初始化Prometheus指标(如果启用)
        if prometheus_enabled:
            self._init_prometheus_metrics()
        
        # 存储历史指标
        self.history = {
            'timestamps': [],
            'accuracy': [],
            'precision': [],
            'recall': [],
            'f1': [],
            'roc_auc': [],
            'psi': [],
            'drift_detected': []
        }
    
    def _init_prometheus_metrics(self):
        """初始化Prometheus指标"""
        # 模型性能指标
        self.accuracy_gauge = Gauge(
            'ai_model_accuracy',
            'Model accuracy score',
            ['model_name', 'model_version']
        )
        
        self.precision_gauge = Gauge(
            'ai_model_precision',
            'Model precision score',
            ['model_name', 'model_version', 'class']
        )
        
        self.recall_gauge = Gauge(
            'ai_model_recall',
            'Model recall score',
            ['model_name', 'model_version', 'class']
        )
        
        self.f1_gauge = Gauge(
            'ai_model_f1_score',
            'Model F1 score',
            ['model_name', 'model_version', 'class']
        )
        
        self.roc_auc_gauge = Gauge(
            'ai_model_roc_auc',
            'Model ROC AUC score',
            ['model_name', 'model_version']
        )
        
        # 数据质量指标
        self.psi_gauge = Gauge(
            'ai_model_psi_score',
            'Population Stability Index score',
            ['model_name', 'model_version', 'feature']
        )
        
        self.data_drift_gauge = Gauge(
            'ai_model_data_drift_detected',
            'Data drift detection flag',
            ['model_name', 'model_version']
        )
        
        self.concept_drift_gauge = Gauge(
            'ai_model_concept_drift_detected',
            'Concept drift detection flag',
            ['model_name', 'model_version']
        )
        
        # 特征漂移指标
        self.feature_drift_gauge = Gauge(
            'ai_model_feature_drift',
            'Feature drift score',
            ['model_name', 'model_version', 'feature']
        )
    
    def set_column_mapping(self, 
                          target_col: str,
                          prediction_col: str,
                          numerical_features: List[str],
                          categorical_features: List[str]):
        """设置列映射"""
        self.column_mapping = ColumnMapping(
            target=target_col,
            prediction=prediction_col,
            numerical_features=numerical_features,
            categorical_features=categorical_features,
            task='classification'
        )
    
    def analyze_current_data(self, 
                            current_data: pd.DataFrame,
                            model_name: str = "default",
                            model_version: str = "1.0") -> ModelMetrics:
        """
        分析当前数据并计算模型指标
        
        Args:
            current_data: 当前数据集
            model_name: 模型名称
            model_version: 模型版本
            
        Returns:
            ModelMetrics: 模型指标对象
        """
        self.current_data = current_data
        
        if self.column_mapping is None:
            raise ValueError("Column mapping must be set before analysis")
        
        # 生成数据漂移报告
        data_drift_report = Report(metrics=[
            DataDriftPreset(),
            DataQualityPreset()
        ])
        
        data_drift_report.run(
            reference_data=self.reference_data,
            current_data=self.current_data,
            column_mapping=self.column_mapping
        )
        
        # 生成分类性能报告
        classification_report = Report(metrics=[
            ClassificationPreset()
        ])
        
        classification_report.run(
            reference_data=self.reference_data,
            current_data=self.current_data,
            column_mapping=self.column_mapping
        )
        
        # 解析报告结果
        drift_results = data_drift_report.as_dict()
        classification_results = classification_report.as_dict()
        
        # 计算PSI(人口稳定性指标)
        psi_scores = self._calculate_psi_scores()
        
        # 提取关键指标
        metrics = self._extract_metrics(
            drift_results, 
            classification_results, 
            psi_scores,
            model_name,
            model_version
        )
        
        # 记录历史
        self._record_history(metrics)
        
        return metrics
    
    def _calculate_psi_scores(self) -> Dict[str, float]:
        """计算PSI分数"""
        psi_scores = {}
        
        # 对数值特征计算PSI
        if self.column_mapping.numerical_features:
            for feature in self.column_mapping.numerical_features:
                psi = self._calculate_feature_psi(
                    self.reference_data[feature],
                    self.current_data[feature]
                )
                psi_scores[feature] = psi
        
        # 对分类特征计算PSI
        if self.column_mapping.categorical_features:
            for feature in self.column_mapping.categorical_features:
                psi = self._calculate_categorical_psi(
                    self.reference_data[feature],
                    self.current_data[feature]
                )
                psi_scores[feature] = psi
        
        return psi_scores
    
    def _calculate_feature_psi(self, reference_series, current_series, buckets=10):
        """计算数值特征的PSI"""
        # 合并数据确定分箱边界
        combined = pd.concat([reference_series, current_series])
        
        # 创建分箱
        percentiles = np.linspace(0, 100, buckets + 1)
        bins = np.percentile(combined.dropna(), percentiles)
        bins[0] = -np.inf
        bins[-1] = np.inf
        
        # 计算分布
        ref_dist, _ = np.histogram(reference_series.dropna(), bins=bins)
        curr_dist, _ = np.histogram(current_series.dropna(), bins=bins)
        
        # 避免零除
        ref_dist = ref_dist / len(reference_series)
        curr_dist = curr_dist / len(current_series)
        ref_dist = np.where(ref_dist == 0, 0.0001, ref_dist)
        curr_dist = np.where(curr_dist == 0, 0.0001, curr_dist)
        
        # 计算PSI
        psi = np.sum((curr_dist - ref_dist) * np.log(curr_dist / ref_dist))
        
        return psi
    
    def _calculate_categorical_psi(self, reference_series, current_series):
        """计算分类特征的PSI"""
        # 获取所有类别
        all_categories = set(reference_series.unique()).union(set(current_series.unique()))
        
        # 计算分布
        ref_dist = reference_series.value_counts(normalize=True).reindex(all_categories, fill_value=0.0001)
        curr_dist = current_series.value_counts(normalize=True).reindex(all_categories, fill_value=0.0001)
        
        # 计算PSI
        psi = np.sum((curr_dist - ref_dist) * np.log(curr_dist / ref_dist))
        
        return psi
    
    def _extract_metrics(self, drift_results, classification_results, psi_scores, model_name, model_version):
        """从报告结果中提取指标"""
        # 从分类报告中提取性能指标
        classification_metrics = classification_results['metrics']
        
        accuracy = None
        precision = None
        recall = None
        f1 = None
        roc_auc = None
        
        for metric in classification_metrics:
            if metric['metric'] == 'ClassificationQualityMetric':
                accuracy = metric['result']['accuracy']
                precision = metric['result']['precision']
                recall = metric['result']['recall']
                f1 = metric['result']['f1']
            elif metric['metric'] == 'RocCurve' and 'roc_auc' in metric['result']:
                roc_auc = metric['result']['roc_auc']
        
        # 从漂移报告中提取漂移信息
        drift_detected = False
        concept_drift_detected = False
        drift_features = []
        
        for metric in drift_results['metrics']:
            if metric['metric'] == 'DatasetDriftMetric':
                drift_detected = metric['result']['drift_detected']
            elif metric['metric'] == 'DataDriftTable':
                for feature_result in metric['result']['drift_by_columns'].values():
                    if feature_result['drift_detected']:
                        drift_features.append(feature_result['column_name'])
        
        # 计算平均PSI
        avg_psi = np.mean(list(psi_scores.values())) if psi_scores else 0.0
        
        # 更新Prometheus指标
        self._update_prometheus_metrics(
            accuracy, precision, recall, f1, roc_auc,
            psi_scores, drift_detected, concept_drift_detected,
            model_name, model_version
        )
        
        return ModelMetrics(
            accuracy=accuracy or 0.0,
            precision=precision or 0.0,
            recall=recall or 0.0,
            f1_score=f1 or 0.0,
            roc_auc=roc_auc or 0.0,
            psi_score=avg_psi,
            data_drift_detected=drift_detected,
            concept_drift_detected=concept_drift_detected,
            drift_features=drift_features
        )
    
    def _update_prometheus_metrics(self, accuracy, precision, recall, f1, roc_auc,
                                  psi_scores, drift_detected, concept_drift_detected,
                                  model_name, model_version):
        """更新Prometheus指标"""
        # 更新性能指标
        if accuracy is not None:
            self.accuracy_gauge.labels(
                model_name=model_name,
                model_version=model_version
            ).set(accuracy)
        
        # 更新PSI指标
        for feature, psi in psi_scores.items():
            self.psi_gauge.labels(
                model_name=model_name,
                model_version=model_version,
                feature=feature
            ).set(psi)
        
        # 更新漂移检测指标
        self.data_drift_gauge.labels(
            model_name=model_name,
            model_version=model_version
        ).set(1 if drift_detected else 0)
    
    def _record_history(self, metrics: ModelMetrics):
        """记录指标历史"""
        timestamp = datetime.now()
        
        self.history['timestamps'].append(timestamp)
        self.history['accuracy'].append(metrics.accuracy)
        self.history['precision'].append(metrics.precision)
        self.history['recall'].append(metrics.recall)
        self.history['f1'].append(metrics.f1_score)
        self.history['roc_auc'].append(metrics.roc_auc)
        self.history['psi'].append(metrics.psi_score)
        self.history['drift_detected'].append(metrics.data_drift_detected)
        
        # 保持历史记录长度
        max_history = 1000
        for key in self.history:
            if len(self.history[key]) > max_history:
                self.history[key] = self.history[key][-max_history:]
    
    def detect_anomalies(self, window_size: int = 30, threshold_std: float = 2.0) -> Dict:
        """检测指标异常"""
        anomalies = {}
        
        # 检查最近窗口的数据
        recent_accuracy = self.history['accuracy'][-window_size:]
        recent_psi = self.history['psi'][-window_size:]
        
        if len(recent_accuracy) >= window_size:
            # 计算统计量
            accuracy_mean = np.mean(recent_accuracy)
            accuracy_std = np.std(recent_accuracy)
            psi_mean = np.mean(recent_psi)
            psi_std = np.std(recent_psi)
            
            # 检测异常
            latest_accuracy = recent_accuracy[-1]
            latest_psi = recent_psi[-1]
            
            accuracy_anomaly = abs(latest_accuracy - accuracy_mean) > threshold_std * accuracy_std
            psi_anomaly = latest_psi > psi_mean + threshold_std * psi_std
            
            anomalies = {
                'accuracy_anomaly': accuracy_anomaly,
                'psi_anomaly': psi_anomaly,
                'accuracy_zscore': (latest_accuracy - accuracy_mean) / accuracy_std if accuracy_std > 0 else 0,
                'psi_zscore': (latest_psi - psi_mean) / psi_std if psi_std > 0 else 0
            }
        
        return anomalies

# 使用示例
if __name__ == "__main__":
    # 创建模拟数据
    np.random.seed(42)
    n_samples = 1000
    
    reference_data = pd.DataFrame({
        'feature1': np.random.normal(0, 1, n_samples),
        'feature2': np.random.choice(['A', 'B', 'C'], n_samples),
        'target': np.random.choice([0, 1], n_samples),
        'prediction': np.random.choice([0, 1], n_samples)
    })
    
    current_data = pd.DataFrame({
        'feature1': np.random.normal(0.2, 1.2, n_samples),  # 引入漂移
        'feature2': np.random.choice(['A', 'B', 'C'], n_samples, p=[0.4, 0.3, 0.3]),
        'target': np.random.choice([0, 1], n_samples),
        'prediction': np.random.choice([0, 1], n_samples)
    })
    
    # 初始化监控器
    monitor = ModelMetricsMonitor(reference_data)
    monitor.set_column_mapping(
        target_col='target',
        prediction_col='prediction',
        numerical_features=['feature1'],
        categorical_features=['feature2']
    )
    
    # 分析当前数据
    metrics = monitor.analyze_current_data(
        current_data,
        model_name="recommendation_model",
        model_version="2.0"
    )
    
    print(f"Accuracy: {metrics.accuracy:.3f}")
    print(f"PSI Score: {metrics.psi_score:.3f}")
    print(f"Data Drift Detected: {metrics.data_drift_detected}")
    print(f"Drift Features: {metrics.drift_features}")
2.2.2 Java模型指标自定义实现
// ModelMetricsCollector.java
package com.ai.monitoring.metrics;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

@Component
public class ModelMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    // 模型性能指标
    private final Map<String, Gauge> accuracyGauges = new ConcurrentHashMap<>();
    private final Map<String, Gauge> precisionGauges = new ConcurrentHashMap<>();
    private final Map<String, Gauge> recallGauges = new ConcurrentHashMap<>();
    private final Map<String, Gauge> f1Gauges = new ConcurrentHashMap<>();
    private final Map<String, Gauge> rocAucGauges = new ConcurrentHashMap<>();
    
    // 数据质量指标
    private final Map<String, Gauge> psiGauges = new ConcurrentHashMap<>();
    private final Map<String, Gauge> driftGauges = new ConcurrentHashMap<>();
    
    // 模型数据存储
    private final Map<String, ModelMetricsData> modelMetricsData = new ConcurrentHashMap<>();
    
    public ModelMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    /**
     * 记录模型预测结果
     */
    public void recordPrediction(String modelId, String modelVersion,
                                double[] probabilities, int actual, int predicted) {
        String key = modelId + "_" + modelVersion;
        
        ModelMetricsData data = modelMetricsData.computeIfAbsent(key,
                k -> new ModelMetricsData(modelId, modelVersion));
        
        data.recordPrediction(probabilities, actual, predicted);
        
        // 更新性能指标
        updatePerformanceMetrics(data);
        
        // 检查数据漂移
        checkDataDrift(data);
    }
    
    /**
     * 记录特征数据(用于PSI计算)
     */
    public void recordFeatures(String modelId, String modelVersion,
                              Map<String, Double> features, boolean isReference) {
        String key = modelId + "_" + modelVersion;
        
        ModelMetricsData data = modelMetricsData.computeIfAbsent(key,
                k -> new ModelMetricsData(modelId, modelVersion));
        
        if (isReference) {
            data.addReferenceFeatures(features);
        } else {
            data.addCurrentFeatures(features);
            
            // 计算PSI
            Map<String, Double> psiScores = calculatePSI(data);
            
            // 更新PSI指标
            updatePSIMetrics(modelId, modelVersion, psiScores);
        }
    }
    
    /**
     * 更新性能指标到Prometheus
     */
    private void updatePerformanceMetrics(ModelMetricsData data) {
        String modelId = data.getModelId();
        String version = data.getModelVersion();
        
        // 计算性能指标
        Map<String, Double> metrics = data.calculatePerformanceMetrics();
        
        // 更新准确率指标
        String accuracyKey = modelId + "_" + version;
        accuracyGauges.computeIfAbsent(accuracyKey, k ->
                Gauge.builder("ai.model.accuracy", 
                        () -> metrics.getOrDefault("accuracy", 0.0))
                     .tag("model", modelId)
                     .tag("version", version)
                     .register(meterRegistry)
        );
        
        // 更新精确率指标
        Gauge precisionGauge = precisionGauges.computeIfAbsent(accuracyKey, k ->
                Gauge.builder("ai.model.precision",
                        () -> metrics.getOrDefault("precision", 0.0))
                     .tag("model", modelId)
                     .tag("version", version)
                     .tag("class", "binary")
                     .register(meterRegistry)
        );
        
        // 更新召回率指标
        Gauge recallGauge = recallGauges.computeIfAbsent(accuracyKey, k ->
                Gauge.builder("ai.model.recall",
                        () -> metrics.getOrDefault("recall", 0.0))
                     .tag("model", modelId)
                     .tag("version", version)
                     .tag("class", "binary")
                     .register(meterRegistry)
        );
        
        // 更新F1分数指标
        Gauge f1Gauge = f1Gauges.computeIfAbsent(accuracyKey, k ->
                Gauge.builder("ai.model.f1",
                        () -> metrics.getOrDefault("f1", 0.0))
                     .tag("model", modelId)
                     .tag("version", version)
                     .tag("class", "binary")
                     .register(meterRegistry)
        );
        
        // 更新ROC AUC指标
        Gauge rocAucGauge = rocAucGauges.computeIfAbsent(accuracyKey, k ->
                Gauge.builder("ai.model.roc_auc",
                        () -> metrics.getOrDefault("roc_auc", 0.0))
                     .tag("model", modelId)
                     .tag("version", version)
                     .register(meterRegistry)
        );
    }
    
    /**
     * 计算PSI(Population Stability Index)
     */
    private Map<String, Double> calculatePSI(ModelMetricsData data) {
        Map<String, List<Double>> referenceFeatures = data.getReferenceFeatures();
        Map<String, List<Double>> currentFeatures = data.getCurrentFeatures();
        
        Map<String, Double> psiScores = new HashMap<>();
        
        for (String feature : referenceFeatures.keySet()) {
            List<Double> refValues = referenceFeatures.get(feature);
            List<Double> currValues = currentFeatures.get(feature);
            
            if (refValues != null && currValues != null && 
                !refValues.isEmpty() && !currValues.isEmpty()) {
                
                double psi = calculateFeaturePSI(refValues, currValues);
                psiScores.put(feature, psi);
            }
        }
        
        return psiScores;
    }
    
    /**
     * 计算单个特征的PSI
     */
    private double calculateFeaturePSI(List<Double> refValues, List<Double> currValues) {
        // 合并数据确定分箱边界
        List<Double> allValues = new ArrayList<>(refValues);
        allValues.addAll(currValues);
        
        // 创建10个等宽分箱
        int numBins = 10;
        double min = Collections.min(allValues);
        double max = Collections.max(allValues);
        double binWidth = (max - min) / numBins;
        
        // 计算参考分布
        double[] refDist = new double[numBins];
        for (Double value : refValues) {
            int binIndex = (int) Math.min((value - min) / binWidth, numBins - 1);
            refDist[binIndex]++;
        }
        
        // 计算当前分布
        double[] currDist = new double[numBins];
        for (Double value : currValues) {
            int binIndex = (int) Math.min((value - min) / binWidth, numBins - 1);
            currDist[binIndex]++;
        }
        
        // 归一化
        double refTotal = refValues.size();
        double currTotal = currValues.size();
        
        double psi = 0.0;
        for (int i = 0; i < numBins; i++) {
            double refProb = refDist[i] / refTotal;
            double currProb = currDist[i] / currTotal;
            
            // 避免零除
            refProb = refProb == 0 ? 0.0001 : refProb;
            currProb = currProb == 0 ? 0.0001 : currProb;
            
            psi += (currProb - refProb) * Math.log(currProb / refProb);
        }
        
        return psi;
    }
    
    /**
     * 更新PSI指标到Prometheus
     */
    private void updatePSIMetrics(String modelId, String modelVersion,
                                 Map<String, Double> psiScores) {
        for (Map.Entry<String, Double> entry : psiScores.entrySet()) {
            String feature = entry.getKey();
            Double psi = entry.getValue();
            
            String psiKey = modelId + "_" + modelVersion + "_" + feature;
            psiGauges.computeIfAbsent(psiKey, k ->
                    Gauge.builder("ai.model.psi",
                            () -> psi)
                         .tag("model", modelId)
                         .tag("version", modelVersion)
                         .tag("feature", feature)
                         .register(meterRegistry)
            );
        }
    }
    
    /**
     * 检查数据漂移
     */
    private void checkDataDrift(ModelMetricsData data) {
        Map<String, Double> psiScores = calculatePSI(data);
        
        // 检查是否有特征的PSI超过阈值
        double psiThreshold = 0.2; // PSI阈值
        boolean driftDetected = psiScores.values().stream()
                .anyMatch(psi -> psi > psiThreshold);
        
        // 更新漂移检测指标
        String key = data.getModelId() + "_" + data.getModelVersion();
        driftGauges.computeIfAbsent(key, k ->
                Gauge.builder("ai.model.data_drift",
                        () -> driftDetected ? 1.0 : 0.0)
                     .tag("model", data.getModelId())
                     .tag("version", data.getModelVersion())
                     .register(meterRegistry)
        );
    }
    
    /**
     * 模型指标数据类
     */
    private static class ModelMetricsData {
        private final String modelId;
        private final String modelVersion;
        
        // 预测结果存储
        private final List<PredictionResult> predictions = Collections.synchronizedList(new ArrayList<>());
        
        // 特征数据存储
        private final Map<String, List<Double>> referenceFeatures = new ConcurrentHashMap<>();
        private final Map<String, List<Double>> currentFeatures = new ConcurrentHashMap<>();
        
        public ModelMetricsData(String modelId, String modelVersion) {
            this.modelId = modelId;
            this.modelVersion = modelVersion;
        }
        
        public void recordPrediction(double[] probabilities, int actual, int predicted) {
            predictions.add(new PredictionResult(probabilities, actual, predicted));
            
            // 保持最近10000条记录
            if (predictions.size() > 10000) {
                predictions.remove(0);
            }
        }
        
        public void addReferenceFeatures(Map<String, Double> features) {
            for (Map.Entry<String, Double> entry : features.entrySet()) {
                referenceFeatures.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
                        .add(entry.getValue());
            }
        }
        
        public void addCurrentFeatures(Map<String, Double> features) {
            for (Map.Entry<String, Double> entry : features.entrySet()) {
                currentFeatures.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
                        .add(entry.getValue());
            }
        }
        
        public Map<String, Double> calculatePerformanceMetrics() {
            if (predictions.isEmpty()) {
                return Collections.emptyMap();
            }
            
            Map<String, Double> metrics = new HashMap<>();
            
            // 计算准确率
            long correct = predictions.stream()
                    .filter(p -> p.actual == p.predicted)
                    .count();
            double accuracy = (double) correct / predictions.size();
            metrics.put("accuracy", accuracy);
            
            // 计算精确率、召回率、F1(二分类)
            long truePositive = predictions.stream()
                    .filter(p -> p.actual == 1 && p.predicted == 1)
                    .count();
            long falsePositive = predictions.stream()
                    .filter(p -> p.actual == 0 && p.predicted == 1)
                    .count();
            long falseNegative = predictions.stream()
                    .filter(p -> p.actual == 1 && p.predicted == 0)
                    .count();
            
            double precision = (truePositive + falsePositive) > 0 ? 
                    (double) truePositive / (truePositive + falsePositive) : 0.0;
            double recall = (truePositive + falseNegative) > 0 ?
                    (double) truePositive / (truePositive + falseNegative) : 0.0;
            double f1 = (precision + recall) > 0 ?
                    2 * precision * recall / (precision + recall) : 0.0;
            
            metrics.put("precision", precision);
            metrics.put("recall", recall);
            metrics.put("f1", f1);
            
            // 计算ROC AUC(简化版本)
            // 实际实现应使用更精确的AUC计算
            double rocAuc = calculateROCAUC();
            metrics.put("roc_auc", rocAuc);
            
            return metrics;
        }
        
        private double calculateROCAUC() {
            // 简化的AUC计算,实际生产环境应使用更精确的方法
            if (predictions.size() < 2) {
                return 0.5; // 默认值
            }
            
            // 按预测概率排序
            List<PredictionResult> sorted = new ArrayList<>(predictions);
            sorted.sort((a, b) -> Double.compare(b.probabilities[1], a.probabilities[1]));
            
            // 计算AUC(使用梯形法则)
            double auc = 0.0;
            double prevFPR = 0.0;
            double prevTPR = 0.0;
            int tp = 0, fp = 0;
            int totalP = (int) sorted.stream().filter(p -> p.actual == 1).count();
            int totalN = sorted.size() - totalP;
            
            for (PredictionResult p : sorted) {
                if (p.actual == 1) {
                    tp++;
                } else {
                    fp++;
                }
                
                double tpr = (double) tp / totalP;
                double fpr = (double) fp / totalN;
                
                auc += (fpr - prevFPR) * (tpr + prevTPR) / 2;
                prevFPR = fpr;
                prevTPR = tpr;
            }
            
            return auc;
        }
        
        public Map<String, List<Double>> getReferenceFeatures() {
            return referenceFeatures;
        }
        
        public Map<String, List<Double>> getCurrentFeatures() {
            return currentFeatures;
        }
        
        public String getModelId() {
            return modelId;
        }
        
        public String getModelVersion() {
            return modelVersion;
        }
    }
    
    /**
     * 预测结果类
     */
    private static class PredictionResult {
        double[] probabilities;
        int actual;
        int predicted;
        
        PredictionResult(double[] probabilities, int actual, int predicted) {
            this.probabilities = probabilities;
            this.actual = actual;
            this.predicted = predicted;
        }
    }
}

2.3 系统指标监控实现

系统指标关注基础设施和应用的运行状态,是保障AI系统稳定性的基础。

2.3.1 Python系统指标采集
# system_metrics_collector.py
import psutil
import time
import threading
from datetime import datetime
from typing import Dict, List, Optional
from prometheus_client import Gauge, Counter, Histogram, start_http_server
import GPUtil

class SystemMetricsCollector:
    def __init__(self, collect_interval: int = 30):
        """
        初始化系统指标采集器
        
        Args:
            collect_interval: 采集间隔(秒)
        """
        self.collect_interval = collect_interval
        self.running = False
        self.collector_thread = None
        
        # 初始化Prometheus指标
        self._init_prometheus_metrics()
        
    def _init_prometheus_metrics(self):
        """初始化Prometheus指标"""
        # CPU指标
        self.cpu_usage = Gauge('system_cpu_usage_percent', 
                              'CPU usage percentage', ['cpu', 'type'])
        self.cpu_freq = Gauge('system_cpu_frequency_mhz',
                             'CPU frequency in MHz', ['cpu'])
        
        # 内存指标
        self.memory_usage = Gauge('system_memory_usage_percent',
                                 'Memory usage percentage', ['type'])
        self.memory_available = Gauge('system_memory_available_bytes',
                                     'Available memory in bytes', [])
        self.memory_used = Gauge('system_memory_used_bytes',
                                'Used memory in bytes', ['type'])
        
        # 磁盘指标
        self.disk_usage = Gauge('system_disk_usage_percent',
                               'Disk usage percentage', ['device', 'mountpoint'])
        self.disk_io_read = Counter('system_disk_read_bytes_total',
                                   'Total disk read bytes', ['device'])
        self.disk_io_write = Counter('system_disk_write_bytes_total',
                                    'Total disk write bytes', ['device'])
        
        # 网络指标
        self.network_bytes_sent = Counter('system_network_bytes_sent_total',
                                         'Total network bytes sent', ['interface'])
        self.network_bytes_recv = Counter('system_network_bytes_recv_total',
                                         'Total network bytes received', ['interface'])
        self.network_packets_sent = Counter('system_network_packets_sent_total',
                                           'Total network packets sent', ['interface'])
        self.network_packets_recv = Counter('system_network_packets_recv_total',
                                           'Total network packets received', ['interface'])
        
        # GPU指标(如果可用)
        self.gpu_available = Gauge('system_gpu_available',
                                  'Number of available GPUs', [])
        self.gpu_usage = Gauge('system_gpu_usage_percent',
                              'GPU usage percentage', ['gpu_id', 'gpu_name'])
        self.gpu_memory_usage = Gauge('system_gpu_memory_usage_percent',
                                     'GPU memory usage percentage', ['gpu_id', 'gpu_name'])
        self.gpu_temperature = Gauge('system_gpu_temperature_celsius',
                                    'GPU temperature in Celsius', ['gpu_id', 'gpu_name'])
        
        # 进程指标
        self.process_cpu = Gauge('system_process_cpu_percent',
                                'Process CPU usage percentage', ['pid', 'name'])
        self.process_memory = Gauge('system_process_memory_percent',
                                   'Process memory usage percentage', ['pid', 'name'])
        self.process_threads = Gauge('system_process_threads',
                                    'Number of process threads', ['pid', 'name'])
        
        # 系统负载
        self.system_load = Gauge('system_load_average',
                                'System load average', ['period'])
        
        # 接口延迟指标
        self.api_latency = Histogram('system_api_latency_seconds',
                                    'API latency in seconds', 
                                    ['service', 'endpoint', 'method'])
        
        # 错误率指标
        self.api_errors = Counter('system_api_errors_total',
                                 'Total API errors', 
                                 ['service', 'endpoint', 'method', 'error_code'])
        
    def start_collecting(self):
        """开始采集系统指标"""
        if self.running:
            return
        
        self.running = True
        self.collector_thread = threading.Thread(target=self._collect_loop, daemon=True)
        self.collector_thread.start()
        print(f"System metrics collector started with interval {self.collect_interval}s")
    
    def stop_collecting(self):
        """停止采集系统指标"""
        self.running = False
        if self.collector_thread:
            self.collector_thread.join(timeout=5)
    
    def _collect_loop(self):
        """指标采集循环"""
        # 初始化上一次的IO计数
        last_disk_io = {}
        last_net_io = {}
        
        while self.running:
            try:
                # 采集时间戳
                timestamp = datetime.now()
                
                # 采集CPU指标
                self._collect_cpu_metrics()
                
                # 采集内存指标
                self._collect_memory_metrics()
                
                # 采集磁盘指标
                last_disk_io = self._collect_disk_metrics(last_disk_io)
                
                # 采集网络指标
                last_net_io = self._collect_network_metrics(last_net_io)
                
                # 采集GPU指标
                self._collect_gpu_metrics()
                
                # 采集进程指标
                self._collect_process_metrics()
                
                # 采集系统负载
                self._collect_load_metrics()
                
                # 等待下一个采集周期
                time.sleep(self.collect_interval)
                
            except Exception as e:
                print(f"Error collecting system metrics: {e}")
                time.sleep(5)
    
    def _collect_cpu_metrics(self):
        """采集CPU指标"""
        # CPU使用率
        cpu_percent = psutil.cpu_percent(percpu=True)
        for i, percent in enumerate(cpu_percent):
            self.cpu_usage.labels(cpu=f'cpu{i}', type='percent').set(percent)
        
        # CPU频率
        try:
            cpu_freq = psutil.cpu_freq(percpu=True)
            if cpu_freq:
                for i, freq in enumerate(cpu_freq):
                    if freq:
                        self.cpu_freq.labels(cpu=f'cpu{i}').set(freq.current)
        except:
            pass  # 某些系统可能不支持
    
    def _collect_memory_metrics(self):
        """采集内存指标"""
        mem = psutil.virtual_memory()
        swap = psutil.swap_memory()
        
        # 物理内存
        self.memory_usage.labels(type='physical').set(mem.percent)
        self.memory_available.set(mem.available)
        self.memory_used.labels(type='physical').set(mem.used)
        
        # 交换内存
        if swap.total > 0:
            swap_percent = (swap.used / swap.total) * 100
            self.memory_usage.labels(type='swap').set(swap_percent)
            self.memory_used.labels(type='swap').set(swap.used)
    
    def _collect_disk_metrics(self, last_io: Dict) -> Dict:
        """采集磁盘指标"""
        current_io = {}
        
        for partition in psutil.disk_partitions(all=False):
            try:
                # 跳过只读和网络文件系统
                if 'ro' in partition.opts or 'nfs' in partition.fstype:
                    continue
                    
                usage = psutil.disk_usage(partition.mountpoint)
                
                # 磁盘使用率
                self.disk_usage.labels(
                    device=partition.device,
                    mountpoint=partition.mountpoint
                ).set(usage.percent)
                
            except Exception as e:
                print(f"Error collecting disk metrics for {partition.mountpoint}: {e}")
        
        # 磁盘IO
        disk_io = psutil.disk_io_counters(perdisk=True)
        for device, io in disk_io.items():
            current_io[device] = (io.read_bytes, io.write_bytes)
            
            if device in last_io:
                last_read, last_write = last_io[device]
                read_diff = io.read_bytes - last_read
                write_diff = io.write_bytes - last_write
                
                # 增加计数器(Prometheus计数器只增不减)
                self.disk_io_read.labels(device=device).inc(read_diff)
                self.disk_io_write.labels(device=device).inc(write_diff)
        
        return current_io
    
    def _collect_network_metrics(self, last_io: Dict) -> Dict:
        """采集网络指标"""
        current_io = {}
        net_io = psutil.net_io_counters(pernic=True)
        
        for interface, io in net_io.items():
            current_io[interface] = (io.bytes_sent, io.bytes_recv, 
                                    io.packets_sent, io.packets_recv)
            
            if interface in last_io:
                last_sent, last_recv, last_packets_sent, last_packets_recv = last_io[interface]
                
                sent_diff = io.bytes_sent - last_sent
                recv_diff = io.bytes_recv - last_recv
                packets_sent_diff = io.packets_sent - last_packets_sent
                packets_recv_diff = io.packets_recv - last_packets_recv
                
                # 增加计数器
                self.network_bytes_sent.labels(interface=interface).inc(sent_diff)
                self.network_bytes_recv.labels(interface=interface).inc(recv_diff)
                self.network_packets_sent.labels(interface=interface).inc(packets_sent_diff)
                self.network_packets_recv.labels(interface=interface).inc(packets_recv_diff)
        
        return current_io
    
    def _collect_gpu_metrics(self):
        """采集GPU指标"""
        try:
            gpus = GPUtil.getGPUs()
            self.gpu_available.set(len(gpus))
            
            for i, gpu in enumerate(gpus):
                gpu_id = str(gpu.id)
                gpu_name = gpu.name.replace(' ', '_')
                
                self.gpu_usage.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(gpu.load * 100)
                self.gpu_memory_usage.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(gpu.memoryUtil * 100)
                self.gpu_temperature.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(gpu.temperature)
                
        except Exception as e:
            # GPU可能不可用
            self.gpu_available.set(0)
    
    def _collect_process_metrics(self):
        """采集进程指标"""
        current_pid = psutil.Process().pid
        
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                info = proc.info
                pid = str(info['pid'])
                name = info['name'] or 'unknown'
                
                # 只监控当前进程和关键进程
                if int(pid) == current_pid or name in ['python', 'java', 'nginx', 'redis']:
                    if info['cpu_percent'] is not None:
                        self.process_cpu.labels(pid=pid, name=name).set(info['cpu_percent'])
                    
                    if info['memory_percent'] is not None:
                        self.process_memory.labels(pid=pid, name=name).set(info['memory_percent'])
                    
                    # 线程数
                    try:
                        threads = proc.num_threads()
                        self.process_threads.labels(pid=pid, name=name).set(threads)
                    except:
                        pass
                        
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
    
    def _collect_load_metrics(self):
        """采集系统负载"""
        try:
            load_avg = psutil.getloadavg()
            periods = ['1min', '5min', '15min']
            
            for period, load in zip(periods, load_avg):
                self.system_load.labels(period=period).set(load)
        except:
            pass
    
    def record_api_latency(self, service: str, endpoint: str, method: str, latency: float):
        """记录API延迟"""
        self.api_latency.labels(
            service=service,
            endpoint=endpoint,
            method=method
        ).observe(latency)
    
    def record_api_error(self, service: str, endpoint: str, method: str, error_code: int):
        """记录API错误"""
        self.api_errors.labels(
            service=service,
            endpoint=endpoint,
            method=method,
            error_code=str(error_code)
        ).inc()
    
    def start_metrics_server(self, port: int = 8000):
        """启动指标服务器"""
        start_http_server(port)
        print(f"System metrics server started on port {port}")

# 使用示例
if __name__ == "__main__":
    collector = SystemMetricsCollector(collect_interval=10)
    
    # 开始采集
    collector.start_collecting()
    
    # 启动指标服务器
    collector.start_metrics_server(8002)
    
    # 模拟API调用记录
    import random
    import time
    
    try:
        while True:
            # 模拟API延迟
            latency = random.uniform(0.1, 2.0)
            collector.record_api_latency(
                service='recommendation',
                endpoint='/api/predict',
                method='POST',
                latency=latency
            )
            
            # 偶尔模拟错误
            if random.random() < 0.05:  # 5%错误率
                collector.record_api_error(
                    service='recommendation',
                    endpoint='/api/predict',
                    method='POST',
                    error_code=500
                )
            
            time.sleep(5)
            
    except KeyboardInterrupt:
        collector.stop_collecting()
        print("System metrics collector stopped")
2.3.2 Java系统指标采集(Micrometer)
// SystemMetricsService.java
package com.ai.monitoring.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.core.instrument.binder.system.UptimeMetrics;
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmHeapPressureMetrics;
import io.micrometer.core.instrument.binder.logging.LogbackMetrics;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Service
public class SystemMetricsService {
    
    private final MeterRegistry meterRegistry;
    
    // 自定义指标
    private Counter totalRequests;
    private Counter failedRequests;
    private Timer apiLatencyTimer;
    private Gauge systemLoadGauge;
    private Gauge diskUsageGauge;
    private Gauge networkUsageGauge;
    
    // 系统监控数据
    private final AtomicLong lastNetworkRxBytes = new AtomicLong(0);
    private final AtomicLong lastNetworkTxBytes = new AtomicLong(0);
    
    public SystemMetricsService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    @PostConstruct
    public void init() {
        // 注册标准指标绑定器
        registerStandardBinders();
        
        // 初始化自定义指标
        initCustomMetrics();
        
        // 启动系统监控线程
        startSystemMonitoring();
    }
    
    private void registerStandardBinders() {
        // JVM指标
        new JvmMemoryMetrics().bindTo(meterRegistry);
        new JvmGcMetrics().bindTo(meterRegistry);
        new JvmThreadMetrics().bindTo(meterRegistry);
        new ClassLoaderMetrics().bindTo(meterRegistry);
        new JvmHeapPressureMetrics().bindTo(meterRegistry);
        
        // 系统指标
        new ProcessorMetrics().bindTo(meterRegistry);
        new UptimeMetrics().bindTo(meterRegistry);
        
        // 日志指标
        new LogbackMetrics().bindTo(meterRegistry);
    }
    
    private void initCustomMetrics() {
        // API请求计数器
        totalRequests = Counter.builder("ai.system.api.requests.total")
                .description("Total API requests")
                .tag("system", "ai-platform")
                .register(meterRegistry);
        
        // 失败请求计数器
        failedRequests = Counter.builder("ai.system.api.requests.failed")
                .description("Total failed API requests")
                .tag("system", "ai-platform")
                .register(meterRegistry);
        
        // API延迟计时器
        apiLatencyTimer = Timer.builder("ai.system.api.latency")
                .description("API latency in milliseconds")
                .tag("system", "ai-platform")
                .publishPercentiles(0.5, 0.95, 0.99) // 发布50%, 95%, 99%分位数
                .register(meterRegistry);
        
        // 系统负载指标
        systemLoadGauge = Gauge.builder("ai.system.load.average",
                        ManagementFactory.getOperatingSystemMXBean(),
                        os -> {
                            if (os instanceof com.sun.management.OperatingSystemMXBean) {
                                return ((com.sun.management.OperatingSystemMXBean) os).getSystemLoadAverage();
                            }
                            return -1.0;
                        })
                .description("System load average")
                .register(meterRegistry);
        
        // 磁盘使用率指标(需要自定义实现)
        diskUsageGauge = Gauge.builder("ai.system.disk.usage.percent",
                        this,
                        service -> service.getDiskUsage())
                .description("Disk usage percentage")
                .tag("mount", "/")
                .register(meterRegistry);
        
        // 网络使用率指标
        networkUsageGauge = Gauge.builder("ai.system.network.usage.percent",
                        this,
                        service -> service.getNetworkUsage())
                .description("Network usage percentage")
                .register(meterRegistry);
    }
    
    private void startSystemMonitoring() {
        Thread monitoringThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 更新系统指标
                    updateSystemMetrics();
                    
                    // 30秒更新一次
                    Thread.sleep(30000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    // 记录错误但不中断
                    e.printStackTrace();
                }
            }
        });
        
        monitoringThread.setDaemon(true);
        monitoringThread.setName("System-Monitoring-Thread");
        monitoringThread.start();
    }
    
    private void updateSystemMetrics() {
        // 这里可以实现自定义的系统指标采集逻辑
        // 例如:磁盘空间、网络IO、自定义进程监控等
        
        // 更新CPU核心数
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        Gauge.builder("ai.system.cpu.cores", availableProcessors, Integer::doubleValue)
                .description("Number of available CPU cores")
                .register(meterRegistry);
        
        // 更新内存使用情况
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        double memoryUsagePercent = (double) usedMemory / totalMemory * 100;
        
        Gauge.builder("ai.system.jvm.memory.usage.percent", memoryUsagePercent, Double::doubleValue)
                .description("JVM memory usage percentage")
                .register(meterRegistry);
        
        Gauge.builder("ai.system.jvm.memory.used", usedMemory, Long::doubleValue)
                .description("JVM memory used in bytes")
                .register(meterRegistry);
        
        Gauge.builder("ai.system.jvm.memory.total", totalMemory, Long::doubleValue)
                .description("JVM total memory in bytes")
                .register(meterRegistry);
    }
    
    public double getDiskUsage() {
        try {
            // 这里需要实现获取磁盘使用率的逻辑
            // 可以使用Java NIO或执行系统命令
            // 简化版本:返回固定值
            return 75.5; // 示例值
        } catch (Exception e) {
            return -1.0;
        }
    }
    
    public double getNetworkUsage() {
        try {
            // 这里需要实现获取网络使用率的逻辑
            // 简化版本:返回固定值
            return 45.2; // 示例值
        } catch (Exception e) {
            return -1.0;
        }
    }
    
    /**
     * 记录API请求
     */
    public void recordApiRequest(String service, String endpoint, String method) {
        totalRequests.increment();
        
        Counter.builder("ai.system.api.requests.by_service")
                .tag("service", service)
                .tag("endpoint", endpoint)
                .tag("method", method)
                .register(meterRegistry)
                .increment();
    }
    
    /**
     * 记录API延迟
     */
    public void recordApiLatency(String service, String endpoint, String method, long latencyMs) {
        apiLatencyTimer.record(latencyMs, TimeUnit.MILLISECONDS);
        
        Timer.builder("ai.system.api.latency.by_service")
                .tag("service", service)
                .tag("endpoint", endpoint)
                .tag("method", method)
                .register(meterRegistry)
                .record(latencyMs, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 记录API错误
     */
    public void recordApiError(String service, String endpoint, String method, String errorCode) {
        failedRequests.increment();
        
        Counter.builder("ai.system.api.errors")
                .tag("service", service)
                .tag("endpoint", endpoint)
                .tag("method", method)
                .tag("error_code", errorCode)
                .register(meterRegistry)
                .increment();
    }
    
    /**
     * 获取当前系统指标快照
     */
    public SystemMetricsSnapshot getCurrentMetrics() {
        RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();
        OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
        
        SystemMetricsSnapshot snapshot = new SystemMetricsSnapshot();
        snapshot.setTimestamp(System.currentTimeMillis());
        snapshot.setJvmUptime(runtimeBean.getUptime());
        snapshot.setAvailableProcessors(osBean.getAvailableProcessors());
        
        if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
            com.sun.management.OperatingSystemMXBean sunOsBean = 
                (com.sun.management.OperatingSystemMXBean) osBean;
            snapshot.setSystemLoadAverage(sunOsBean.getSystemLoadAverage());
            snapshot.setProcessCpuLoad(sunOsBean.getProcessCpuLoad());
            snapshot.setSystemCpuLoad(sunOsBean.getSystemCpuLoad());
        }
        
        // 内存信息
        Runtime runtime = Runtime.getRuntime();
        snapshot.setTotalMemory(runtime.totalMemory());
        snapshot.setFreeMemory(runtime.freeMemory());
        snapshot.setUsedMemory(runtime.totalMemory() - runtime.freeMemory());
        
        return snapshot;
    }
    
    /**
     * 系统指标快照类
     */
    public static class SystemMetricsSnapshot {
        private long timestamp;
        private long jvmUptime;
        private int availableProcessors;
        private double systemLoadAverage = -1;
        private double processCpuLoad = -1;
        private double systemCpuLoad = -1;
        private long totalMemory;
        private long usedMemory;
        private long freeMemory;
        
        // Getters and Setters
        public long getTimestamp() { return timestamp; }
        public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
        
        public long getJvmUptime() { return jvmUptime; }
        public void setJvmUptime(long jvmUptime) { this.jvmUptime = jvmUptime; }
        
        public int getAvailableProcessors() { return availableProcessors; }
        public void setAvailableProcessors(int availableProcessors) { 
            this.availableProcessors = availableProcessors; 
        }
        
        public double getSystemLoadAverage() { return systemLoadAverage; }
        public void setSystemLoadAverage(double systemLoadAverage) { 
            this.systemLoadAverage = systemLoadAverage; 
        }
        
        public double getProcessCpuLoad() { return processCpuLoad; }
        public void setProcessCpuLoad(double processCpuLoad) { 
            this.processCpuLoad = processCpuLoad; 
        }
        
        public double getSystemCpuLoad() { return systemCpuLoad; }
        public void setSystemCpuLoad(double systemCpuLoad) { 
            this.systemCpuLoad = systemCpuLoad; 
        }
        
        public long getTotalMemory() { return totalMemory; }
        public void setTotalMemory(long totalMemory) { this.totalMemory = totalMemory; }
        
        public long getUsedMemory() { return usedMemory; }
        public void setUsedMemory(long usedMemory) { this.usedMemory = usedMemory; }
        
        public long getFreeMemory() { return freeMemory; }
        public void setFreeMemory(long freeMemory) { this.freeMemory = freeMemory; }
        
        public double getMemoryUsagePercent() {
            if (totalMemory > 0) {
                return (double) usedMemory / totalMemory * 100;
            }
            return 0.0;
        }
    }
}

2.4 Vue可视化监控大盘

可视化是将监控数据转化为洞察的关键环节,以下是一个基于Vue和ECharts的监控大盘实现:

<!-- MetricsDashboard.vue -->
<template>
  <div class="metrics-dashboard">
    <!-- 顶部筛选栏 -->
    <div class="dashboard-header">
      <h1>AI系统监控大盘</h1>
      
      <div class="filter-controls">
        <el-select v-model="selectedService" placeholder="选择服务" clearable>
          <el-option
            v-for="service in availableServices"
            :key="service"
            :label="service"
            :value="service"
          />
        </el-select>
        
        <el-date-picker
          v-model="timeRange"
          type="datetimerange"
          range-separator="至"
          start-placeholder="开始时间"
          end-placeholder="结束时间"
          :shortcuts="timeShortcuts"
          @change="handleTimeRangeChange"
        />
        
        <el-select v-model="refreshInterval" placeholder="刷新间隔">
          <el-option label="关闭自动刷新" :value="0" />
          <el-option label="30秒" :value="30000" />
          <el-option label="1分钟" :value="60000" />
          <el-option label="5分钟" :value="300000" />
        </el-select>
        
        <el-button type="primary" @click="refreshData">刷新</el-button>
      </div>
    </div>
    
    <!-- 指标概览卡片 -->
    <div class="metrics-overview">
      <el-row :gutter="20">
        <el-col :span="6">
          <metric-card
            title="系统健康度"
            :value="systemHealth"
            :trend="systemHealthTrend"
            unit="%"
            color="#67C23A"
            :loading="loading.overview"
          />
        </el-col>
        
        <el-col :span="6">
          <metric-card
            title="API成功率"
            :value="apiSuccessRate"
            :trend="apiSuccessRateTrend"
            unit="%"
            color="#409EFF"
            :loading="loading.overview"
          />
        </el-col>
        
        <el-col :span="6">
          <metric-card
            title="模型准确率"
            :value="modelAccuracy"
            :trend="modelAccuracyTrend"
            unit="%"
            color="#E6A23C"
            :loading="loading.overview"
          />
        </el-col>
        
        <el-col :span="6">
          <metric-card
            title="平均响应时间"
            :value="avgResponseTime"
            :trend="responseTimeTrend"
            unit="ms"
            color="#F56C6C"
            :loading="loading.overview"
          />
        </el-col>
      </el-row>
    </div>
    
    <!-- 图表区域 -->
    <div class="charts-container">
      <el-row :gutter="20">
        <!-- 系统资源监控 -->
        <el-col :span="12">
          <el-card class="chart-card">
            <template #header>
              <div class="chart-header">
                <span>系统资源使用率</span>
                <div class="chart-controls">
                  <el-radio-group v-model="resourceViewType" size="small">
                    <el-radio-button label="cpu">CPU</el-radio-button>
                    <el-radio-button label="memory">内存</el-radio-button>
                    <el-radio-button label="disk">磁盘</el-radio-button>
                    <el-radio-button label="network">网络</el-radio-button>
                  </el-radio-group>
                </div>
              </div>
            </template>
            
            <div class="chart-content">
              <div ref="resourceChart" style="width: 100%; height: 300px;"></div>
            </div>
          </el-card>
        </el-col>
        
        <!-- 业务指标趋势 -->
        <el-col :span="12">
          <el-card class="chart-card">
            <template #header>
              <div class="chart-header">
                <span>业务指标趋势</span>
                <div class="chart-controls">
                  <el-select v-model="selectedBusinessMetric" size="small" style="width: 150px;">
                    <el-option label="CTR(点击率)" value="ctr" />
                    <el-option label="转化率" value="conversion_rate" />
                    <el-option label="用户留存率" value="user_retention" />
                    <el-option label="活跃用户数" value="active_users" />
                  </el-select>
                </div>
              </div>
            </template>
            
            <div class="chart-content">
              <div ref="businessChart" style="width: 100%; height: 300px;"></div>
            </div>
          </el-card>
        </el-col>
      </el-row>
      
      <el-row :gutter="20" style="margin-top: 20px;">
        <!-- 模型性能监控 -->
        <el-col :span="12">
          <el-card class="chart-card">
            <template #header>
              <div class="chart-header">
                <span>模型性能监控</span>
                <div class="chart-controls">
                  <el-select v-model="selectedModel" size="small" style="width: 200px;">
                    <el-option
                      v-for="model in availableModels"
                      :key="model.id"
                      :label="model.name"
                      :value="model.id"
                    />
                  </el-select>
                </div>
              </div>
            </template>
            
            <div class="chart-content">
              <div class="model-metrics-grid">
                <div class="metric-item" v-for="metric in modelMetrics" :key="metric.name">
                  <div class="metric-label">{{ metric.label }}</div>
                  <div class="metric-value" :class="getMetricClass(metric)">
                    {{ metric.value.toFixed(2) }}{{ metric.unit }}
                  </div>
                  <div class="metric-trend">
                    <span :class="getTrendClass(metric.trend)">
                      <i :class="getTrendIcon(metric.trend)"></i>
                      {{ metric.trend.toFixed(1) }}%
                    </span>
                  </div>
                </div>
              </div>
              
              <div ref="modelPerformanceChart" style="width: 100%; height: 200px; margin-top: 20px;"></div>
            </div>
          </el-card>
        </el-col>
        
        <!-- 实时告警面板 -->
        <el-col :span="12">
          <el-card class="chart-card">
            <template #header>
              <div class="chart-header">
                <span>实时告警</span>
                <span class="alarm-count" :class="getAlarmCountClass">
                  {{ activeAlarms.length }} 个活跃告警
                </span>
              </div>
            </template>
            
            <div class="chart-content">
              <el-table
                :data="activeAlarms"
                style="width: 100%"
                height="250"
                v-loading="loading.alarms"
              >
                <el-table-column prop="timestamp" label="时间" width="150">
                  <template #default="scope">
                    {{ formatTime(scope.row.timestamp) }}
                  </template>
                </el-table-column>
                
                <el-table-column prop="severity" label="级别" width="80">
                  <template #default="scope">
                    <el-tag
                      :type="getSeverityType(scope.row.severity)"
                      size="small"
                    >
                      {{ scope.row.severity }}
                    </el-tag>
                  </template>
                </el-table-column>
                
                <el-table-column prop="service" label="服务" width="120" />
                
                <el-table-column prop="description" label="描述" />
                
                <el-table-column label="操作" width="100">
                  <template #default="scope">
                    <el-button
                      type="text"
                      size="small"
                      @click="handleAcknowledgeAlarm(scope.row)"
                    >
                      确认
                    </el-button>
                  </template>
                </el-table-column>
              </el-table>
              
              <div class="alarm-summary">
                <div class="summary-item">
                  <span class="summary-label">紧急:</span>
                  <span class="summary-value critical">{{ criticalAlarmsCount }}</span>
                </div>
                <div class="summary-item">
                  <span class="summary-label">严重:</span>
                  <span class="summary-value major">{{ majorAlarmsCount }}</span>
                </div>
                <div class="summary-item">
                  <span class="summary-label">警告:</span>
                  <span class="summary-value minor">{{ minorAlarmsCount }}</span>
                </div>
                <div class="summary-item">
                  <span class="summary-label">提示:</span>
                  <span class="summary-value info">{{ infoAlarmsCount }}</span>
                </div>
              </div>
            </div>
          </el-card>
        </el-col>
      </el-row>
    </div>
    
    <!-- 指标钻取弹窗 -->
    <el-dialog
      v-model="metricDetailVisible"
      :title="selectedMetricDetail.title"
      width="80%"
    >
      <div v-if="selectedMetricDetail">
        <div class="metric-detail-header">
          <div class="detail-stats">
            <div class="stat-item">
              <span class="stat-label">当前值:</span>
              <span class="stat-value">{{ selectedMetricDetail.currentValue }}</span>
            </div>
            <div class="stat-item">
              <span class="stat-label">平均值:</span>
              <span class="stat-value">{{ selectedMetricDetail.avgValue }}</span>
            </div>
            <div class="stat-item">
              <span class="stat-label">最大值:</span>
              <span class="stat-value">{{ selectedMetricDetail.maxValue }}</span>
            </div>
            <div class="stat-item">
              <span class="stat-label">最小值:</span>
              <span class="stat-value">{{ selectedMetricDetail.minValue }}</span>
            </div>
          </div>
        </div>
        
        <div ref="detailChart" style="width: 100%; height: 400px;"></div>
        
        <el-table
          :data="metricDetailData"
          style="width: 100%; margin-top: 20px;"
          height="250"
        >
          <el-table-column prop="timestamp" label="时间" width="180" />
          <el-table-column prop="value" label="值" width="120" />
          <el-table-column prop="change" label="变化" width="120">
            <template #default="scope">
              <span :class="scope.row.change >= 0 ? 'positive' : 'negative'">
                {{ scope.row.change >= 0 ? '+' : '' }}{{ scope.row.change.toFixed(2) }}%
              </span>
            </template>
          </el-table-column>
          <el-table-column prop="status" label="状态" width="100">
            <template #default="scope">
              <el-tag
                :type="getStatusType(scope.row.status)"
                size="small"
              >
                {{ scope.row.status }}
              </el-tag>
            </template>
          </el-table-column>
          <el-table-column prop="note" label="备注" />
        </el-table>
      </div>
    </el-dialog>
  </div>
</template>

<script>
import * as echarts from 'echarts'
import { onMounted, onUnmounted, ref, watch, computed } from 'vue'
import { ElMessage, ElMessageBox } from 'element-plus'
import MetricCard from './components/MetricCard.vue'

export default {
  name: 'MetricsDashboard',
  components: {
    MetricCard
  },
  setup() {
    // 响应式数据
    const selectedService = ref('')
    const timeRange = ref([new Date(Date.now() - 3600000), new Date()]) // 默认最近1小时
    const refreshInterval = ref(30000) // 默认30秒刷新
    const resourceViewType = ref('cpu')
    const selectedBusinessMetric = ref('ctr')
    const selectedModel = ref('')
    
    // 图表引用
    const resourceChart = ref(null)
    const businessChart = ref(null)
    const modelPerformanceChart = ref(null)
    const detailChart = ref(null)
    
    // 图表实例
    let resourceChartInstance = null
    let businessChartInstance = null
    let modelPerformanceChartInstance = null
    let detailChartInstance = null
    
    // 数据状态
    const loading = ref({
      overview: false,
      charts: false,
      alarms: false
    })
    
    // 指标数据
    const systemHealth = ref(98.5)
    const systemHealthTrend = ref(0.5)
    const apiSuccessRate = ref(99.2)
    const apiSuccessRateTrend = ref(-0.2)
    const modelAccuracy = ref(95.7)
    const modelAccuracyTrend = ref(0.3)
    const avgResponseTime = ref(245)
    const responseTimeTrend = ref(-5.2)
    
    // 可用服务列表
    const availableServices = ref([
      'recommendation-service',
      'user-profile-service',
      'model-serving-service',
      'data-processing-service'
    ])
    
    // 可用模型列表
    const availableModels = ref([
      { id: 'model_1', name: '推荐模型 v2.1' },
      { id: 'model_2', name: '用户分类模型 v1.5' },
      { id: 'model_3', name: '点击预测模型 v3.0' }
    ])
    
    // 模型指标
    const modelMetrics = ref([
      { name: 'accuracy', label: '准确率', value: 95.7, unit: '%', trend: 0.3, threshold: 90 },
      { name: 'precision', label: '精确率', value: 94.2, unit: '%', trend: 0.1, threshold: 92 },
      { name: 'recall', label: '召回率', value: 93.8, unit: '%', trend: -0.2, threshold: 91 },
      { name: 'f1', label: 'F1分数', value: 94.0, unit: '%', trend: 0.0, threshold: 92 },
      { name: 'psi', label: 'PSI', value: 0.15, unit: '', trend: 0.02, threshold: 0.25 },
      { name: 'drift', label: '漂移检测', value: 0, unit: '', trend: 0, threshold: 1 }
    ])
    
    // 告警数据
    const activeAlarms = ref([
      { 
        id: 'alarm_001', 
        timestamp: Date.now() - 300000, // 5分钟前
        severity: 'critical',
        service: 'recommendation-service',
        description: '模型准确率下降超过10%',
        acknowledged: false
      },
      { 
        id: 'alarm_002', 
        timestamp: Date.now() - 600000, // 10分钟前
        severity: 'major',
        service: 'model-serving-service',
        description: 'API响应时间超过1秒',
        acknowledged: false
      },
      { 
        id: 'alarm_003', 
        timestamp: Date.now() - 900000, // 15分钟前
        severity: 'minor',
        service: 'data-processing-service',
        description: '数据处理延迟增加',
        acknowledged: false
      }
    ])
    
    // 指标详情弹窗
    const metricDetailVisible = ref(false)
    const selectedMetricDetail = ref(null)
    const metricDetailData = ref([])
    
    // 时间快捷方式
    const timeShortcuts = ref([
      {
        text: '最近1小时',
        value: () => {
          const end = new Date()
          const start = new Date()
          start.setTime(start.getTime() - 3600000)
          return [start, end]
        }
      },
      {
        text: '最近6小时',
        value: () => {
          const end = new Date()
          const start = new Date()
          start.setTime(start.getTime() - 3600000 * 6)
          return [start, end]
        }
      },
      {
        text: '最近24小时',
        value: () => {
          const end = new Date()
          const start = new Date()
          start.setTime(start.getTime() - 3600000 * 24)
          return [start, end]
        }
      },
      {
        text: '最近7天',
        value: () => {
          const end = new Date()
          const start = new Date()
          start.setTime(start.getTime() - 3600000 * 24 * 7)
          return [start, end]
        }
      }
    ])
    
    // 计算属性
    const criticalAlarmsCount = computed(() => {
      return activeAlarms.value.filter(a => a.severity === 'critical').length
    })
    
    const majorAlarmsCount = computed(() => {
      return activeAlarms.value.filter(a => a.severity === 'major').length
    })
    
    const minorAlarmsCount = computed(() => {
      return activeAlarms.value.filter(a => a.severity === 'minor').length
    })
    
    const infoAlarmsCount = computed(() => {
      return activeAlarms.value.filter(a => a.severity === 'info').length
    })
    
    const getAlarmCountClass = computed(() => {
      if (criticalAlarmsCount.value > 0) return 'alarm-critical'
      if (majorAlarmsCount.value > 0) return 'alarm-major'
      if (minorAlarmsCount.value > 0) return 'alarm-minor'
      return 'alarm-normal'
    })
    
    // 方法
    const refreshData = async () => {
      loading.value.overview = true
      loading.value.charts = true
      
      try {
        // 获取概览数据
        await fetchOverviewData()
        
        // 获取图表数据
        await fetchChartData()
        
        // 获取告警数据
        await fetchAlarmData()
        
        ElMessage.success('数据刷新成功')
      } catch (error) {
        ElMessage.error('数据刷新失败: ' + error.message)
      } finally {
        loading.value.overview = false
        loading.value.charts = false
      }
    }
    
    const fetchOverviewData = async () => {
      // 模拟API调用
      return new Promise(resolve => {
        setTimeout(() => {
          // 更新概览数据(模拟随机变化)
          systemHealth.value = 98.5 + Math.random() * 0.5
          systemHealthTrend.value = (Math.random() - 0.5) * 2
          
          apiSuccessRate.value = 99.2 + Math.random() * 0.3
          apiSuccessRateTrend.value = (Math.random() - 0.5) * 1
          
          modelAccuracy.value = 95.7 + Math.random() * 0.6
          modelAccuracyTrend.value = (Math.random() - 0.5) * 0.5
          
          avgResponseTime.value = 245 + Math.random() * 10
          responseTimeTrend.value = (Math.random() - 0.5) * 3
          
          resolve()
        }, 500)
      })
    }
    
    const fetchChartData = async () => {
      // 初始化图表
      initResourceChart()
      initBusinessChart()
      initModelPerformanceChart()
    }
    
    const fetchAlarmData = async () => {
      loading.value.alarms = true
      
      try {
        // 模拟API调用获取告警数据
        // 实际应该调用后端API
        // const response = await axios.get('/api/alarms/active')
        // activeAlarms.value = response.data
        
        // 这里使用模拟数据
        await new Promise(resolve => setTimeout(resolve, 300))
      } finally {
        loading.value.alarms = false
      }
    }
    
    const initResourceChart = () => {
      if (!resourceChart.value) return
      
      if (resourceChartInstance) {
        resourceChartInstance.dispose()
      }
      
      resourceChartInstance = echarts.init(resourceChart.value)
      
      // 根据选择的视图类型显示不同数据
      let option = {}
      
      if (resourceViewType.value === 'cpu') {
        option = getCPUChartOption()
      } else if (resourceViewType.value === 'memory') {
        option = getMemoryChartOption()
      } else if (resourceViewType.value === 'disk') {
        option = getDiskChartOption()
      } else if (resourceViewType.value === 'network') {
        option = getNetworkChartOption()
      }
      
      resourceChartInstance.setOption(option)
      
      // 监听窗口大小变化
      window.addEventListener('resize', () => {
        resourceChartInstance.resize()
      })
    }
    
    const getCPUChartOption = () => {
      // 生成模拟数据
      const now = new Date()
      const data = []
      
      for (let i = 30; i >= 0; i--) {
        const time = new Date(now.getTime() - i * 60000) // 每分钟一个点
        data.push({
          time: time.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }),
          value: 20 + Math.random() * 60 // 20-80%之间
        })
      }
      
      return {
        tooltip: {
          trigger: 'axis',
          formatter: '{b0}: {c0}%'
        },
        grid: {
          left: '3%',
          right: '4%',
          bottom: '3%',
          containLabel: true
        },
        xAxis: {
          type: 'category',
          boundaryGap: false,
          data: data.map(d => d.time)
        },
        yAxis: {
          type: 'value',
          min: 0,
          max: 100,
          axisLabel: {
            formatter: '{value}%'
          }
        },
        series: [
          {
            name: 'CPU使用率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.value),
            areaStyle: {
              color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
                { offset: 0, color: 'rgba(64, 158, 255, 0.3)' },
                { offset: 1, color: 'rgba(64, 158, 255, 0.1)' }
              ])
            },
            lineStyle: {
              color: '#409EFF'
            },
            itemStyle: {
              color: '#409EFF'
            }
          }
        ]
      }
    }
    
    const getMemoryChartOption = () => {
      // 内存图表配置
      return {
        tooltip: {
          trigger: 'axis',
          formatter: '{b0}: {c0}%'
        },
        grid: {
          left: '3%',
          right: '4%',
          bottom: '3%',
          containLabel: true
        },
        xAxis: {
          type: 'category',
          boundaryGap: false,
          data: Array.from({ length: 31 }, (_, i) => {
            const d = new Date(now.getTime() - (30 - i) * 60000)
            return d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' })
          })
        },
        yAxis: {
          type: 'value',
          min: 0,
          max: 100,
          axisLabel: {
            formatter: '{value}%'
          }
        },
        series: [
          {
            name: '内存使用率',
            type: 'line',
            smooth: true,
            data: Array.from({ length: 31 }, () => 40 + Math.random() * 40),
            areaStyle: {
              color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
                { offset: 0, color: 'rgba(103, 194, 58, 0.3)' },
                { offset: 1, color: 'rgba(103, 194, 58, 0.1)' }
              ])
            },
            lineStyle: {
              color: '#67C23A'
            },
            itemStyle: {
              color: '#67C23A'
            }
          }
        ]
      }
    }
    
    const initBusinessChart = () => {
      if (!businessChart.value) return
      
      if (businessChartInstance) {
        businessChartInstance.dispose()
      }
      
      businessChartInstance = echarts.init(businessChart.value)
      
      // 生成模拟业务数据
      const now = new Date()
      const data = []
      
      for (let i = 30; i >= 0; i--) {
        const time = new Date(now.getTime() - i * 3600000) // 每小时一个点
        data.push({
          time: time.toLocaleString([], { month: 'short', day: 'numeric', hour: '2-digit' }),
          ctr: 2 + Math.random() * 3, // 2-5%
          conversion: 10 + Math.random() * 15, // 10-25%
          retention: 70 + Math.random() * 20, // 70-90%
          activeUsers: 1000 + Math.random() * 500 // 1000-1500
        })
      }
      
      const selectedMetric = selectedBusinessMetric.value
      const metricNames = {
        ctr: 'CTR',
        conversion_rate: '转化率',
        user_retention: '用户留存率',
        active_users: '活跃用户数'
      }
      
      const option = {
        tooltip: {
          trigger: 'axis',
          formatter: function(params) {
            const time = params[0].axisValue
            let result = time + '<br/>'
            
            params.forEach(param => {
              let unit = '%'
              if (param.seriesName === '活跃用户数') unit = ''
              
              result += `${param.marker} ${param.seriesName}: ${param.value}${unit}<br/>`
            })
            
            return result
          }
        },
        legend: {
          data: ['CTR', '转化率', '用户留存率', '活跃用户数']
        },
        grid: {
          left: '3%',
          right: '4%',
          bottom: '3%',
          containLabel: true
        },
        xAxis: {
          type: 'category',
          boundaryGap: false,
          data: data.map(d => d.time)
        },
        yAxis: [
          {
            type: 'value',
            name: '百分比 (%)',
            min: 0,
            max: 100,
            position: 'left',
            axisLabel: {
              formatter: '{value}%'
            }
          },
          {
            type: 'value',
            name: '用户数',
            min: 0,
            position: 'right',
            axisLabel: {
              formatter: '{value}'
            }
          }
        ],
        series: [
          {
            name: 'CTR',
            type: 'line',
            smooth: true,
            data: data.map(d => d.ctr),
            yAxisIndex: 0,
            lineStyle: {
              color: '#409EFF'
            },
            itemStyle: {
              color: '#409EFF'
            }
          },
          {
            name: '转化率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.conversion),
            yAxisIndex: 0,
            lineStyle: {
              color: '#67C23A'
            },
            itemStyle: {
              color: '#67C23A'
            }
          },
          {
            name: '用户留存率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.retention),
            yAxisIndex: 0,
            lineStyle: {
              color: '#E6A23C'
            },
            itemStyle: {
              color: '#E6A23C'
            }
          },
          {
            name: '活跃用户数',
            type: 'line',
            smooth: true,
            data: data.map(d => d.activeUsers),
            yAxisIndex: 1,
            lineStyle: {
              color: '#F56C6C'
            },
            itemStyle: {
              color: '#F56C6C'
            }
          }
        ]
      }
      
      businessChartInstance.setOption(option)
      
      // 监听窗口大小变化
      window.addEventListener('resize', () => {
        businessChartInstance.resize()
      })
    }
    
    const initModelPerformanceChart = () => {
      if (!modelPerformanceChart.value) return
      
      if (modelPerformanceChartInstance) {
        modelPerformanceChartInstance.dispose()
      }
      
      modelPerformanceChartInstance = echarts.init(modelPerformanceChart.value)
      
      // 生成模拟模型性能数据
      const now = new Date()
      const data = []
      
      for (let i = 7; i >= 0; i--) {
        const date = new Date(now.getTime() - i * 24 * 3600000)
        data.push({
          date: date.toLocaleDateString([], { month: 'short', day: 'numeric' }),
          accuracy: 95 + Math.random() * 3 - 1.5,
          precision: 94 + Math.random() * 3 - 1.5,
          recall: 93 + Math.random() * 3 - 1.5,
          f1: 94 + Math.random() * 2 - 1,
          psi: Math.random() * 0.3
        })
      }
      
      const option = {
        tooltip: {
          trigger: 'axis',
          axisPointer: {
            type: 'cross'
          }
        },
        legend: {
          data: ['准确率', '精确率', '召回率', 'F1分数', 'PSI']
        },
        grid: {
          left: '3%',
          right: '4%',
          bottom: '3%',
          containLabel: true
        },
        xAxis: {
          type: 'category',
          boundaryGap: false,
          data: data.map(d => d.date)
        },
        yAxis: [
          {
            type: 'value',
            name: '性能指标 (%)',
            min: 90,
            max: 100,
            position: 'left',
            axisLabel: {
              formatter: '{value}%'
            }
          },
          {
            type: 'value',
            name: 'PSI',
            min: 0,
            max: 0.5,
            position: 'right',
            axisLabel: {
              formatter: '{value}'
            }
          }
        ],
        series: [
          {
            name: '准确率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.accuracy),
            yAxisIndex: 0,
            lineStyle: {
              color: '#409EFF'
            },
            itemStyle: {
              color: '#409EFF'
            }
          },
          {
            name: '精确率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.precision),
            yAxisIndex: 0,
            lineStyle: {
              color: '#67C23A'
            },
            itemStyle: {
              color: '#67C23A'
            }
          },
          {
            name: '召回率',
            type: 'line',
            smooth: true,
            data: data.map(d => d.recall),
            yAxisIndex: 0,
            lineStyle: {
              color: '#E6A23C'
            },
            itemStyle: {
              color: '#E6A23C'
            }
          },
          {
            name: 'F1分数',
            type: 'line',
            smooth: true,
            data: data.map(d => d.f1),
            yAxisIndex: 0,
            lineStyle: {
              color: '#F56C6C'
            },
            itemStyle: {
              color: '#F56C6C'
            }
          },
          {
            name: 'PSI',
            type: 'line',
            smooth: true,
            data: data.map(d => d.psi),
            yAxisIndex: 1,
            lineStyle: {
              color: '#909399'
            },
            itemStyle: {
              color: '#909399'
            }
          }
        ]
      }
      
      modelPerformanceChartInstance.setOption(option)
      
      // 监听窗口大小变化
      window.addEventListener('resize', () => {
        modelPerformanceChartInstance.resize()
      })
    }
    
    const handleTimeRangeChange = () => {
      refreshData()
    }
    
    const handleAcknowledgeAlarm = (alarm) => {
      ElMessageBox.confirm(
        `确认处理告警 "${alarm.description}"?`,
        '确认告警',
        {
          confirmButtonText: '确认',
          cancelButtonText: '取消',
          type: 'warning'
        }
      ).then(() => {
        // 调用API确认告警
        // await axios.post(`/api/alarms/${alarm.id}/acknowledge`)
        
        // 更新本地状态
        const index = activeAlarms.value.findIndex(a => a.id === alarm.id)
        if (index !== -1) {
          activeAlarms.value.splice(index, 1)
        }
        
        ElMessage.success('告警已确认')
      }).catch(() => {
        // 用户取消
      })
    }
    
    const getSeverityType = (severity) => {
      switch (severity) {
        case 'critical': return 'danger'
        case 'major': return 'warning'
        case 'minor': return 'info'
        case 'info': return ''
        default: return ''
      }
    }
    
    const getMetricClass = (metric) => {
      if (metric.name === 'drift') {
        return metric.value > 0 ? 'metric-warning' : 'metric-normal'
      }
      
      if (metric.value < metric.threshold) {
        return 'metric-warning'
      }
      
      return 'metric-normal'
    }
    
    const getTrendClass = (trend) => {
      if (trend > 0) return 'trend-positive'
      if (trend < 0) return 'trend-negative'
      return 'trend-neutral'
    }
    
    const getTrendIcon = (trend) => {
      if (trend > 0) return 'el-icon-top'
      if (trend < 0) return 'el-icon-bottom'
      return 'el-icon-minus'
    }
    
    const formatTime = (timestamp) => {
      const date = new Date(timestamp)
      return date.toLocaleString()
    }
    
    const showMetricDetail = (metric) => {
      selectedMetricDetail.value = {
        title: `${metric.label}详情`,
        currentValue: metric.value + metric.unit,
        avgValue: (metric.value * 0.95).toFixed(2) + metric.unit,
        maxValue: (metric.value * 1.1).toFixed(2) + metric.unit,
        minValue: (metric.value * 0.9).toFixed(2) + metric.unit
      }
      
      // 生成详情数据
      metricDetailData.value = Array.from({ length: 20 }, (_, i) => {
        const timestamp = new Date(Date.now() - (19 - i) * 3600000)
        const value = metric.value + (Math.random() - 0.5) * metric.value * 0.1
        const change = (Math.random() - 0.5) * 2
        const status = change > 1 ? '警告' : change < -1 ? '异常' : '正常'
        
        return {
          timestamp: timestamp.toLocaleString(),
          value: value.toFixed(2) + metric.unit,
          change: change,
          status: status,
          note: change > 1 ? '值上升过快' : change < -1 ? '值下降过快' : '正常波动'
        }
      })
      
      metricDetailVisible.value = true
      
      // 初始化详情图表
      nextTick(() => {
        initDetailChart(metric)
      })
    }
    
    const initDetailChart = (metric) => {
      if (!detailChart.value) return
      
      if (detailChartInstance) {
        detailChartInstance.dispose()
      }
      
      detailChartInstance = echarts.init(detailChart.value)
      
      // 生成详情图表数据
      const data = Array.from({ length: 50 }, (_, i) => {
        return {
          time: new Date(Date.now() - (49 - i) * 3600000).toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }),
          value: metric.value + (Math.random() - 0.5) * metric.value * 0.2
        }
      })
      
      const option = {
        tooltip: {
          trigger: 'axis',
          formatter: '{b0}: {c0}' + (metric.unit || '')
        },
        grid: {
          left: '3%',
          right: '4%',
          bottom: '3%',
          containLabel: true
        },
        xAxis: {
          type: 'category',
          boundaryGap: false,
          data: data.map(d => d.time)
        },
        yAxis: {
          type: 'value',
          name: metric.label + (metric.unit || ''),
          min: Math.min(...data.map(d => d.value)) * 0.9,
          max: Math.max(...data.map(d => d.value)) * 1.1
        },
        series: [
          {
            name: metric.label,
            type: 'line',
            smooth: true,
            data: data.map(d => d.value),
            lineStyle: {
              color: '#409EFF'
            },
            areaStyle: {
              color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
                { offset: 0, color: 'rgba(64, 158, 255, 0.3)' },
                { offset: 1, color: 'rgba(64, 158, 255, 0.1)' }
              ])
            },
            markLine: {
              silent: true,
              data: [
                {
                  yAxis: metric.threshold || 0,
                  label: {
                    formatter: '阈值: ' + (metric.threshold || 0) + (metric.unit || '')
                  },
                  lineStyle: {
                    color: '#F56C6C',
                    type: 'dashed'
                  }
                }
              ]
            }
          }
        ]
      }
      
      detailChartInstance.setOption(option)
      
      // 监听窗口大小变化
      window.addEventListener('resize', () => {
        detailChartInstance.resize()
      })
    }
    
    const getStatusType = (status) => {
      switch (status) {
        case '正常': return 'success'
        case '警告': return 'warning'
        case '异常': return 'danger'
        default: return ''
      }
    }
    
    // 监听器
    watch(refreshInterval, (newVal) => {
      if (refreshTimer) {
        clearInterval(refreshTimer)
      }
      
      if (newVal > 0) {
        refreshTimer = setInterval(() => {
          refreshData()
        }, newVal)
      }
    })
    
    watch(resourceViewType, () => {
      initResourceChart()
    })
    
    watch(selectedBusinessMetric, () => {
      initBusinessChart()
    })
    
    watch(selectedModel, (newVal) => {
      if (newVal) {
        // 更新模型指标数据
        updateModelMetrics(newVal)
      }
    })
    
    const updateModelMetrics = (modelId) => {
      // 模拟根据模型ID获取指标数据
      // 实际应该调用后端API
      modelMetrics.value = modelMetrics.value.map(metric => {
        return {
          ...metric,
          value: metric.value + (Math.random() - 0.5) * 2,
          trend: (Math.random() - 0.5) * 0.5
        }
      })
    }
    
    // 生命周期
    onMounted(() => {
      // 初始化数据
      refreshData()
      
      // 设置默认模型
      if (availableModels.value.length > 0) {
        selectedModel.value = availableModels.value[0].id
      }
      
      // 初始化定时刷新
      if (refreshInterval.value > 0) {
        refreshTimer = setInterval(() => {
          refreshData()
        }, refreshInterval.value)
      }
    })
    
    onUnmounted(() => {
      // 清理定时器
      if (refreshTimer) {
        clearInterval(refreshTimer)
      }
      
      // 清理图表实例
      if (resourceChartInstance) {
        resourceChartInstance.dispose()
      }
      if (businessChartInstance) {
        businessChartInstance.dispose()
      }
      if (modelPerformanceChartInstance) {
        modelPerformanceChartInstance.dispose()
      }
      if (detailChartInstance) {
        detailChartInstance.dispose()
      }
      
      // 移除事件监听器
      window.removeEventListener('resize', () => {
        if (resourceChartInstance) resourceChartInstance.resize()
        if (businessChartInstance) businessChartInstance.resize()
        if (modelPerformanceChartInstance) modelPerformanceChartInstance.resize()
        if (detailChartInstance) detailChartInstance.resize()
      })
    })
    
    // 定时器引用
    let refreshTimer = null
    
    return {
      // 响应式数据
      selectedService,
      timeRange,
      refreshInterval,
      resourceViewType,
      selectedBusinessMetric,
      selectedModel,
      
      // 引用
      resourceChart,
      businessChart,
      modelPerformanceChart,
      detailChart,
      
      // 状态
      loading,
      
      // 指标数据
      systemHealth,
      systemHealthTrend,
      apiSuccessRate,
      apiSuccessRateTrend,
      modelAccuracy,
      modelAccuracyTrend,
      avgResponseTime,
      responseTimeTrend,
      
      // 列表数据
      availableServices,
      availableModels,
      modelMetrics,
      activeAlarms,
      
      // 弹窗控制
      metricDetailVisible,
      selectedMetricDetail,
      metricDetailData,
      
      // 时间快捷方式
      timeShortcuts,
      
      // 计算属性
      criticalAlarmsCount,
      majorAlarmsCount,
      minorAlarmsCount,
      infoAlarmsCount,
      getAlarmCountClass,
      
      // 方法
      refreshData,
      handleTimeRangeChange,
      handleAcknowledgeAlarm,
      getSeverityType,
      getMetricClass,
      getTrendClass,
      getTrendIcon,
      formatTime,
      showMetricDetail,
      getStatusType
    }
  }
}
</script>

<style scoped>
.metrics-dashboard {
  padding: 20px;
  background-color: #f5f7fa;
  min-height: 100vh;
}

.dashboard-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  margin-bottom: 20px;
  background: white;
  padding: 20px;
  border-radius: 4px;
  box-shadow: 0 2px 12px 0 rgba(0, 0, 0, 0.1);
}

.dashboard-header h1 {
  margin: 0;
  color: #303133;
}

.filter-controls {
  display: flex;
  gap: 10px;
  align-items: center;
}

.metrics-overview {
  margin-bottom: 20px;
}

.charts-container {
  margin-top: 20px;
}

.chart-card {
  margin-bottom: 0;
  height: 100%;
}

.chart-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}

.chart-content {
  margin-top: 10px;
}

.model-metrics-grid {
  display: grid;
  grid-template-columns: repeat(3, 1fr);
  gap: 15px;
  margin-bottom: 20px;
}

.metric-item {
  background: #f8f9fa;
  border-radius: 4px;
  padding: 15px;
  text-align: center;
  border-left: 4px solid #409EFF;
}

.metric-item.warning {
  border-left-color: #E6A23C;
}

.metric-item.danger {
  border-left-color: #F56C6C;
}

.metric-label {
  font-size: 12px;
  color: #909399;
  margin-bottom: 5px;
}

.metric-value {
  font-size: 20px;
  font-weight: bold;
  color: #303133;
  margin-bottom: 5px;
}

.metric-value.metric-normal {
  color: #67C23A;
}

.metric-value.metric-warning {
  color: #E6A23C;
}

.metric-trend {
  font-size: 12px;
}

.trend-positive {
  color: #F56C6C;
}

.trend-negative {
  color: #67C23A;
}

.trend-neutral {
  color: #909399;
}

.alarm-count {
  font-size: 14px;
  padding: 4px 8px;
  border-radius: 4px;
}

.alarm-critical {
  background-color: #FEF0F0;
  color: #F56C6C;
  border: 1px solid #FDE2E2;
}

.alarm-major {
  background-color: #FDF6EC;
  color: #E6A23C;
  border: 1px solid #FAECD8;
}

.alarm-minor {
  background-color: #F4F4F5;
  color: #909399;
  border: 1px solid #E9E9EB;
}

.alarm-normal {
  background-color: #F0F9EB;
  color: #67C23A;
  border: 1px solid #E1F3D8;
}

.alarm-summary {
  display: flex;
  justify-content: space-around;
  margin-top: 15px;
  padding-top: 15px;
  border-top: 1px solid #ebeef5;
}

.summary-item {
  text-align: center;
}

.summary-label {
  display: block;
  font-size: 12px;
  color: #909399;
  margin-bottom: 5px;
}

.summary-value {
  display: block;
  font-size: 18px;
  font-weight: bold;
}

.summary-value.critical {
  color: #F56C6C;
}

.summary-value.major {
  color: #E6A23C;
}

.summary-value.minor {
  color: #409EFF;
}

.summary-value.info {
  color: #909399;
}

.metric-detail-header {
  margin-bottom: 20px;
  padding-bottom: 20px;
  border-bottom: 1px solid #ebeef5;
}

.detail-stats {
  display: flex;
  justify-content: space-around;
}

.stat-item {
  text-align: center;
}

.stat-label {
  display: block;
  font-size: 12px;
  color: #909399;
  margin-bottom: 5px;
}

.stat-value {
  display: block;
  font-size: 18px;
  font-weight: bold;
  color: #303133;
}

.positive {
  color: #F56C6C;
}

.negative {
  color: #67C23A;
}

.el-table .warning-row {
  background: oldlace;
}

.el-table .success-row {
  background: #f0f9eb;
}
</style>

三、日志监控体系:结构化日志与智能分析

3.1 Python日志监控实现(Loguru)

# logging_monitor.py
import json
import sys
import time
from datetime import datetime
from typing import Dict, Any, Optional, List
from uuid import uuid4
from enum import Enum
import re

from loguru import logger
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from prometheus_client import Counter, Histogram

class LogLevel(Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class AILoggingMonitor:
    def __init__(self, 
                 elasticsearch_hosts: List[str] = None,
                 index_prefix: str = "ai-logs",
                 enable_prometheus: bool = True):
        """
        初始化AI日志监控器
        
        Args:
            elasticsearch_hosts: Elasticsearch主机列表
            index_prefix: Elasticsearch索引前缀
            enable_prometheus: 是否启用Prometheus指标
        """
        # 配置日志处理器
        self._configure_logger()
        
        # Elasticsearch配置
        self.elasticsearch_hosts = elasticsearch_hosts or ["localhost:9200"]
        self.index_prefix = index_prefix
        self.es_client = None
        
        if elasticsearch_hosts:
            self._init_elasticsearch()
        
        # Prometheus指标
        if enable_prometheus:
            self._init_prometheus_metrics()
        
        # 异常检测器
        self.exception_patterns = self._load_exception_patterns()
        
        # 请求追踪上下文
        self.request_context = {}
    
    def _configure_logger(self):
        """配置Loguru日志器"""
        # 移除默认处理器
        logger.remove()
        
        # 控制台输出(开发环境)
        logger.add(
            sys.stdout,
            format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
                   "<level>{level: <8}</level> | "
                   "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
                   "<level>{message}</level>",
            level="INFO",
            colorize=True
        )
        
        # 文件输出(生产环境)
        logger.add(
            "logs/ai_system_{time:YYYY-MM-DD}.log",
            rotation="500 MB",  # 日志轮转:500MB
            retention="30 days",  # 保留30天
            compression="zip",  # 压缩存储
            format="{time:YYYY-MM-DD HH:mm:ss.SSS} | "
                   "{level: <8} | "
                   "{name}:{function}:{line} | "
                   "{message}",
            level="DEBUG",
            encoding="utf-8"
        )
        
        # 错误日志单独文件
        logger.add(
            "logs/errors_{time:YYYY-MM-DD}.log",
            rotation="100 MB",
            retention="60 days",
            compression="zip",
            format="{time:YYYY-MM-DD HH:mm:ss.SSS} | "
                   "{level: <8} | "
                   "{name}:{function}:{line} | "
                   "{message}\n{exception}",
            level="ERROR",
            encoding="utf-8"
        )
    
    def _init_elasticsearch(self):
        """初始化Elasticsearch客户端"""
        try:
            self.es_client = Elasticsearch(
                hosts=self.elasticsearch_hosts,
                sniff_on_start=True,
                sniff_on_connection_fail=True,
                sniffer_timeout=60
            )
            
            # 测试连接
            if self.es_client.ping():
                logger.info(f"Connected to Elasticsearch at {self.elasticsearch_hosts}")
                
                # 创建索引模板
                self._create_index_template()
            else:
                logger.warning("Failed to connect to Elasticsearch")
                self.es_client = None
                
        except Exception as e:
            logger.error(f"Failed to initialize Elasticsearch: {e}")
            self.es_client = None
    
    def _create_index_template(self):
        """创建Elasticsearch索引模板"""
        template_name = f"{self.index_prefix}-template"
        template_body = {
            "index_patterns": [f"{self.index_prefix}-*"],
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1,
                "refresh_interval": "30s"
            },
            "mappings": {
                "dynamic": "strict",
                "properties": {
                    "timestamp": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss.SSS||epoch_millis"
                    },
                    "level": {
                        "type": "keyword"
                    },
                    "service": {
                        "type": "keyword"
                    },
                    "module": {
                        "type": "keyword"
                    },
                    "request_id": {
                        "type": "keyword"
                    },
                    "user_id": {
                        "type": "keyword"
                    },
                    "session_id": {
                        "type": "keyword"
                    },
                    "model_name": {
                        "type": "keyword"
                    },
                    "model_version": {
                        "type": "keyword"
                    },
                    "operation": {
                        "type": "keyword"
                    },
                    "duration_ms": {
                        "type": "float"
                    },
                    "status": {
                        "type": "keyword"
                    },
                    "error_code": {
                        "type": "keyword"
                    },
                    "error_message": {
                        "type": "text"
                    },
                    "input_features": {
                        "type": "object",
                        "enabled": False  # 不索引,只存储
                    },
                    "output_predictions": {
                        "type": "object",
                        "enabled": False
                    },
                    "confidence_scores": {
                        "type": "float"
                    },
                    "metadata": {
                        "type": "object"
                    },
                    "message": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "stack_trace": {
                        "type": "text"
                    },
                    "tags": {
                        "type": "keyword"
                    },
                    "host": {
                        "type": "keyword"
                    },
                    "environment": {
                        "type": "keyword"
                    }
                }
            }
        }
        
        try:
            self.es_client.indices.put_template(
                name=template_name,
                body=template_body
            )
            logger.info(f"Created Elasticsearch index template: {template_name}")
        except Exception as e:
            logger.error(f"Failed to create index template: {e}")
    
    def _init_prometheus_metrics(self):
        """初始化Prometheus指标"""
        # 日志级别计数器
        self.log_counter = Counter(
            'ai_system_logs_total',
            'Total number of logs by level',
            ['service', 'module', 'level']
        )
        
        # 日志大小直方图
        self.log_size_histogram = Histogram(
            'ai_system_log_size_bytes',
            'Log size in bytes',
            ['service', 'module'],
            buckets=[100, 500, 1000, 5000, 10000, 50000]
        )
        
        # 异常检测计数器
        self.exception_counter = Counter(
            'ai_system_exceptions_total',
            'Total number of exceptions',
            ['service', 'module', 'exception_type']
        )
    
    def _load_exception_patterns(self) -> Dict[str, re.Pattern]:
        """加载异常检测模式"""
        patterns = {
            'connection_error': re.compile(r'.*(connection|timeout|refused|reset).*', re.IGNORECASE),
            'memory_error': re.compile(r'.*(memory|oom|out of memory).*', re.IGNORECASE),
            'data_error': re.compile(r'.*(data|format|type|value).*error.*', re.IGNORECASE),
            'model_error': re.compile(r'.*(model|prediction|inference).*error.*', re.IGNORECASE),
            'authentication_error': re.compile(r'.*(auth|permission|access|unauthorized).*', re.IGNORECASE),
            'resource_error': re.compile(r'.*(resource|quota|limit|capacity).*', re.IGNORECASE)
        }
        return patterns
    
    def set_request_context(self, request_id: str = None, 
                           user_id: str = None,
                           session_id: str = None):
        """设置请求上下文"""
        if request_id is None:
            request_id = str(uuid4())
        
        self.request_context = {
            'request_id': request_id,
            'user_id': user_id,
            'session_id': session_id,
            'start_time': time.time()
        }
        
        return request_id
    
    def clear_request_context(self):
        """清除请求上下文"""
        self.request_context = {}
    
    def log_model_inference(self, 
                           service: str,
                           module: str,
                           model_name: str,
                           model_version: str,
                           input_features: Dict[str, Any],
                           output_predictions: Dict[str, Any],
                           confidence: float = None,
                           metadata: Dict[str, Any] = None,
                           level: LogLevel = LogLevel.INFO):
        """
        记录模型推理日志
        
        Args:
            service: 服务名称
            module: 模块名称
            model_name: 模型名称
            model_version: 模型版本
            input_features: 输入特征
            output_predictions: 输出预测
            confidence: 置信度
            metadata: 额外元数据
            level: 日志级别
        """
        log_entry = self._create_log_entry(
            service=service,
            module=module,
            level=level,
            message=f"Model inference: {model_name} v{model_version}",
            metadata={
                'model_name': model_name,
                'model_version': model_version,
                'input_features': input_features,
                'output_predictions': output_predictions,
                'confidence_scores': confidence,
                'operation': 'inference',
                **(metadata or {})
            }
        )
        
        self._process_log_entry(log_entry)
    
    def log_data_processing(self,
                           service: str,
                           module: str,
                           operation: str,
                           data_stats: Dict[str, Any],
                           duration_ms: float = None,
                           metadata: Dict[str, Any] = None,
                           level: LogLevel = LogLevel.INFO):
        """
        记录数据处理日志
        
        Args:
            service: 服务名称
            module: 模块名称
            operation: 操作类型
            data_stats: 数据统计信息
            duration_ms: 耗时(毫秒)
            metadata: 额外元数据
            level: 日志级别
        """
        log_entry = self._create_log_entry(
            service=service,
            module=module,
            level=level,
            message=f"Data processing: {operation}",
            metadata={
                'operation': operation,
                'data_stats': data_stats,
                'duration_ms': duration_ms,
                **(metadata or {})
            }
        )
        
        self._process_log_entry(log_entry)
    
    def log_api_call(self,
                    service: str,
                    module: str,
                    endpoint: str,
                    method: str,
                    status: str,
                    duration_ms: float,
                    error_code: str = None,
                    error_message: str = None,
                    metadata: Dict[str, Any] = None):
        """
        记录API调用日志
        
        Args:
            service: 服务名称
            module: 模块名称
            endpoint: API端点
            method: HTTP方法
            status: 状态(success/error)
            duration_ms: 耗时(毫秒)
            error_code: 错误代码
            error_message: 错误信息
            metadata: 额外元数据
        """
        level = LogLevel.ERROR if status == 'error' else LogLevel.INFO
        
        log_entry = self._create_log_entry(
            service=service,
            module=module,
            level=level,
            message=f"API {method} {endpoint}: {status}",
            metadata={
                'operation': 'api_call',
                'endpoint': endpoint,
                'method': method,
                'status': status,
                'duration_ms': duration_ms,
                'error_code': error_code,
                'error_message': error_message,
                **(metadata or {})
            }
        )
        
        self._process_log_entry(log_entry)
    
    def log_exception(self,
                     service: str,
                     module: str,
                     exception: Exception,
                     context: Dict[str, Any] = None,
                     level: LogLevel = LogLevel.ERROR):
        """
        记录异常日志
        
        Args:
            service: 服务名称
            module: 模块名称
            exception: 异常对象
            context: 异常上下文
            level: 日志级别
        """
        # 分析异常类型
        exception_type = type(exception).__name__
        exception_message = str(exception)
        
        # 检测异常模式
        detected_patterns = self._detect_exception_patterns(exception_message)
        
        log_entry = self._create_log_entry(
            service=service,
            module=module,
            level=level,
            message=f"Exception {exception_type}: {exception_message}",
            metadata={
                'operation': 'exception',
                'exception_type': exception_type,
                'exception_message': exception_message,
                'exception_patterns': detected_patterns,
                'stack_trace': self._get_stack_trace(),
                'context': context or {}
            }
        )
        
        self._process_log_entry(log_entry)
        
        # 更新Prometheus指标
        if hasattr(self, 'exception_counter'):
            self.exception_counter.labels(
                service=service,
                module=module,
                exception_type=exception_type
            ).inc()
    
    def _create_log_entry(self,
                         service: str,
                         module: str,
                         level: LogLevel,
                         message: str,
                         metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """
        创建结构化日志条目
        
        Args:
            service: 服务名称
            module: 模块名称
            level: 日志级别
            message: 日志消息
            metadata: 额外元数据
            
        Returns:
            Dict: 结构化日志条目
        """
        timestamp = datetime.now()
        
        log_entry = {
            'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
            'epoch_ms': int(timestamp.timestamp() * 1000),
            'level': level.value,
            'service': service,
            'module': module,
            'message': message,
            'host': self._get_host_info(),
            'environment': self._get_environment(),
            'tags': ['ai-system', service, module],
            **(metadata or {})
        }
        
        # 添加上下文信息
        if self.request_context:
            log_entry.update({
                'request_id': self.request_context.get('request_id'),
                'user_id': self.request_context.get('user_id'),
                'session_id': self.request_context.get('session_id')
            })
            
            # 计算请求持续时间
            start_time = self.request_context.get('start_time')
            if start_time:
                duration = (time.time() - start_time) * 1000  # 转换为毫秒
                log_entry['duration_ms'] = duration
        
        return log_entry
    
    def _process_log_entry(self, log_entry: Dict[str, Any]):
        """
        处理日志条目
        
        Args:
            log_entry: 日志条目
        """
        # 记录到Loguru
        self._log_to_loguru(log_entry)
        
        # 发送到Elasticsearch
        if self.es_client:
            self._send_to_elasticsearch(log_entry)
        
        # 更新Prometheus指标
        if hasattr(self, 'log_counter'):
            self.log_counter.labels(
                service=log_entry['service'],
                module=log_entry['module'],
                level=log_entry['level']
            ).inc()
            
            # 记录日志大小
            log_size = len(json.dumps(log_entry).encode('utf-8'))
            self.log_size_histogram.labels(
                service=log_entry['service'],
                module=log_entry['module']
            ).observe(log_size)
        
        # 异常检测
        if log_entry['level'] in ['ERROR', 'CRITICAL']:
            self._detect_and_alert_anomalies(log_entry)
    
    def _log_to_loguru(self, log_entry: Dict[str, Any]):
        """记录到Loguru"""
        log_message = json.dumps(log_entry, ensure_ascii=False)
        
        if log_entry['level'] == LogLevel.DEBUG.value:
            logger.debug(log_message)
        elif log_entry['level'] == LogLevel.INFO.value:
            logger.info(log_message)
        elif log_entry['level'] == LogLevel.WARNING.value:
            logger.warning(log_message)
        elif log_entry['level'] == LogLevel.ERROR.value:
            logger.error(log_message)
        elif log_entry['level'] == LogLevel.CRITICAL.value:
            logger.critical(log_message)
    
    def _send_to_elasticsearch(self, log_entry: Dict[str, Any]):
        """发送到Elasticsearch"""
        try:
            # 创建索引名称(按日期分片)
            index_date = datetime.now().strftime('%Y.%m.%d')
            index_name = f"{self.index_prefix}-{index_date}"
            
            # 准备文档
            doc = {
                '_index': index_name,
                '_source': log_entry
            }
            
            # 批量写入(实际生产环境应使用批量API)
            self.es_client.index(
                index=index_name,
                body=log_entry
            )
            
        except Exception as e:
            logger.error(f"Failed to send log to Elasticsearch: {e}")
    
    def _detect_exception_patterns(self, error_message: str) -> List[str]:
        """检测异常模式"""
        detected = []
        
        for pattern_name, pattern in self.exception_patterns.items():
            if pattern.search(error_message):
                detected.append(pattern_name)
        
        return detected
    
    def _detect_and_alert_anomalies(self, log_entry: Dict[str, Any]):
        """检测并告警异常"""
        # 这里可以实现更复杂的异常检测逻辑
        # 例如:基于规则的检测、机器学习异常检测等
        
        error_message = log_entry.get('message', '')
        exception_patterns = log_entry.get('metadata', {}).get('exception_patterns', [])
        
        # 规则1:连续错误
        # 可以在这里实现滑动窗口计数等逻辑
        
        # 规则2:特定错误模式
        critical_patterns = ['memory_error', 'authentication_error']
        for pattern in critical_patterns:
            if pattern in exception_patterns:
                self._trigger_alert(
                    severity='critical',
                    service=log_entry['service'],
                    module=log_entry['module'],
                    message=f"检测到{pattern}: {error_message}",
                    log_entry=log_entry
                )
                break
    
    def _trigger_alert(self, severity: str, service: str, module: str, 
                      message: str, log_entry: Dict[str, Any]):
        """触发告警"""
        alert_data = {
            'timestamp': datetime.now().isoformat(),
            'severity': severity,
            'service': service,
            'module': module,
            'message': message,
            'log_entry': log_entry,
            'alert_id': str(uuid4())
        }
        
        # 记录告警日志
        logger.warning(f"ALERT: {json.dumps(alert_data, ensure_ascii=False)}")
        
        # 这里可以集成到告警系统(如发送到消息队列、调用webhook等)
        # 例如:发送到企业微信、Slack、邮件等
    
    def _get_host_info(self) -> str:
        """获取主机信息"""
        import socket
        try:
            return socket.gethostname()
        except:
            return "unknown"
    
    def _get_environment(self) -> str:
        """获取环境信息"""
        import os
        return os.getenv('ENVIRONMENT', 'development')
    
    def _get_stack_trace(self) -> str:
        """获取堆栈跟踪"""
        import traceback
        return traceback.format_exc()
    
    def bulk_index_logs(self, logs: List[Dict[str, Any]]):
        """批量索引日志到Elasticsearch"""
        if not self.es_client:
            return
        
        try:
            # 准备批量操作
            actions = []
            index_date = datetime.now().strftime('%Y.%m.%d')
            index_name = f"{self.index_prefix}-{index_date}"
            
            for log_entry in logs:
                action = {
                    '_index': index_name,
                    '_source': log_entry
                }
                actions.append(action)
            
            # 执行批量操作
            success, failed = bulk(
                self.es_client,
                actions,
                stats_only=True
            )
            
            logger.info(f"Bulk indexed {success} logs, failed: {failed}")
            
        except Exception as e:
            logger.error(f"Failed to bulk index logs: {e}")
    
    def search_logs(self, 
                   query: Dict[str, Any] = None,
                   size: int = 100,
                   sort_field: str = "timestamp",
                   sort_order: str = "desc") -> List[Dict[str, Any]]:
        """
        搜索日志
        
        Args:
            query: Elasticsearch查询DSL
            size: 返回结果数量
            sort_field: 排序字段
            sort_order: 排序顺序
            
        Returns:
            List[Dict]: 搜索结果
        """
        if not self.es_client:
            return []
        
        if query is None:
            query = {
                "match_all": {}
            }
        
        try:
            # 搜索所有索引
            index_pattern = f"{self.index_prefix}-*"
            
            response = self.es_client.search(
                index=index_pattern,
                body={
                    "query": query,
                    "size": size,
                    "sort": [
                        {sort_field: {"order": sort_order}}
                    ]
                }
            )
            
            # 提取结果
            hits = response.get('hits', {}).get('hits', [])
            results = [hit['_source'] for hit in hits]
            
            return results
            
        except Exception as e:
            logger.error(f"Failed to search logs: {e}")
            return []
    
    def get_error_summary(self, 
                         service: str = None,
                         module: str = None,
                         hours: int = 24) -> Dict[str, Any]:
        """
        获取错误摘要
        
        Args:
            service: 服务名称(可选)
            module: 模块名称(可选)
            hours: 时间范围(小时)
            
        Returns:
            Dict: 错误摘要
        """
        query_filters = []
        
        # 时间范围
        time_filter = {
            "range": {
                "epoch_ms": {
                    "gte": int((time.time() - hours * 3600) * 1000)
                }
            }
        }
        query_filters.append(time_filter)
        
        # 错误级别
        error_filter = {
            "terms": {
                "level": ["ERROR", "CRITICAL"]
            }
        }
        query_filters.append(error_filter)
        
        # 服务过滤
        if service:
            service_filter = {
                "term": {
                    "service": service
                }
            }
            query_filters.append(service_filter)
        
        # 模块过滤
        if module:
            module_filter = {
                "term": {
                    "module": module
                }
            }
            query_filters.append(module_filter)
        
        # 构建查询
        query = {
            "bool": {
                "must": query_filters
            }
        }
        
        # 聚合查询
        aggregation = {
            "aggs": {
                "by_service": {
                    "terms": {
                        "field": "service",
                        "size": 10
                    },
                    "aggs": {
                        "by_module": {
                            "terms": {
                                "field": "module",
                                "size": 10
                            },
                            "aggs": {
                                "by_exception": {
                                    "terms": {
                                        "field": "metadata.exception_type.keyword",
                                        "size": 10
                                    }
                                }
                            }
                        }
                    }
                },
                "error_trend": {
                    "date_histogram": {
                        "field": "epoch_ms",
                        "calendar_interval": "hour",
                        "format": "yyyy-MM-dd HH:mm"
                    }
                }
            }
        }
        
        try:
            response = self.es_client.search(
                index=f"{self.index_prefix}-*",
                body={
                    "query": query,
                    "size": 0,
                    "aggs": aggregation
                }
            )
            
            aggregations = response.get('aggregations',
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐