通知监控系统框架:Python + API + Email + Docker + Kubernetes + Prometheus

适合小白的 Step by Step 搭建指南

本文将手把手教你从零开始搭建一个端到端(E2E)通知监控系统。即使你没有相关经验,也能跟着步骤一步步完成!

📋 前置要求

在开始之前,请确保你的电脑已安装以下工具:

  • Python 3.8+ - 编程语言环境
  • Docker - 容器化运行环境
  • Git - 代码版本管理
  • Outlook邮箱账户 - 用于接收和验证通知邮件(支持 OAuth2 认证)

💡 验证安装:在终端运行 python --versiondocker --versiongit --version 确认是否安装成功。

Step 1: 创建项目目录结构

首先创建项目的文件夹结构:

# 创建项目根目录
mkdir notification-monitor
cd notification-monitor

# 创建源代码目录
mkdir src
mkdir deploy

1.1 创建主程序文件

创建 src/notification-monitor.py 文件:

import json
import logging
import os
import sys
import time
import uuid
import requests
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from cron_validator import CronScheduler

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# 全局变量用于跟踪指标
global_success_counter = {}
global_email_success_counter = {}

def send_test_notification(api_config):
    """发送测试通知 API 请求"""
    try:
        logging.info("开始发送测试通知...")
        
        # 构建请求
        headers = api_config['headers']
        body = json.dumps(api_config['body'])
        url = api_config['host'] + api_config['path']
        
        # 发送请求
        response = requests.post(
            url=url,
            data=body,
            headers=headers,
            timeout=30
        )
        
        # 检查响应
        if response.status_code == 201:
            logging.info("通知 API 调用成功!")
            return True, response.text
        else:
            logging.error(f"通知 API 调用失败,状态码: {response.status_code}")
            return False, response.text
            
    except Exception as e:
        logging.error(f"通知 API 调用异常: {e}")
        return False, str(e)

def check_email_received(email_client_config):
    """检查是否收到通知邮件(使用 Outlook OAuth2 认证)"""
    try:
        logging.info("开始检查通知邮件...")
        
        # 实际项目中会调用 getEmailByToken.py 中的 OAuth2 实现
        # 这里是简化版本,实际使用 Microsoft Graph API
        logging.info("邮件检查完成!")
        return True, ["test-notification@example.com"]
        
    except Exception as e:
        logging.error(f"邮件检查异常: {e}")
        return False, []

def main():
    # 从环境变量获取配置
    api_host = os.getenv('API_HOST', 'https://api.example.com')
    api_token = os.getenv('API_TOKEN', 'your-api-token')
    email_address = os.getenv('EMAIL_ADDRESS', 'test@example.com')
    
    # 配置通知 API
    api_config = {
        "method": "POST",
        "host": api_host,
        "path": "/v1/notifications",
        "headers": {
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
            "x-trace-id": str(uuid.uuid4())
        },
        "body": {
            "eventId": "test-001",
            "customerId": "test-customer",
            "eventData": {
                "message": "This is a test notification",
                "recipient": email_address
            }
        }
    }
    
    # 配置邮件客户端
    email_config = {
        "email_address": email_address
    }
    
    # 发送通知
    api_success, api_response = send_test_notification(api_config)
    
    # 检查邮件(仅在 API 成功时)
    email_success = False
    email_list = []
    if api_success:
        email_success, email_list = check_email_received(email_config)
    
    # 输出结果
    if api_success and email_success:
        print("✅ 通知监控测试通过!")
        sys.exit(0)
    else:
        print("❌ 通知监控测试失败!")
        if not api_success:
            print("  - API 调用失败")
        if not email_success:
            print("  - 邮件接收失败")
        sys.exit(1)

if __name__ == '__main__':
    # 启动指标服务器
    start_metrics_server()
    
    # 内置定时器循环(每30分钟执行一次)
    scheduler = CronScheduler("*/30 * * * *")
    while True:
        if scheduler.time_for_execution():
            main()
        time.sleep(60)  # 每分钟检查一次定时器

1.2 创建邮箱 OAuth2 客户端文件

创建 src/getEmailByToken.py 文件(参考实际实现):

import requests
import logging

def get_access_token(client_id, client_secret, refresh_token):
    """使用 OAuth2 获取 Outlook 访问令牌"""
    url = 'https://login.microsoftonline.com/consumers/oauth2/v2.0/token'
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    data = {
        'client_id': client_id,
        'client_secret': client_secret,
        'refresh_token': refresh_token,
        'grant_type': 'refresh_token'
    }
    response = requests.post(url, headers=headers, data=data)
    if response.status_code == 200:
        return response.json().get('access_token')
    else:
        logging.error(f"Failed to refresh access token: {response.status_code} {response.text}")
        return None

def get_mail(client_id, client_secret, refresh_token, region):
    """使用 Microsoft Graph API 获取未读邮件"""
    access_token = get_access_token(client_id, client_secret, refresh_token)
    if not access_token:
        return 0, [], [], [], None
        
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json',
        'ConsistencyLevel': 'eventual'
    }
    
    # 使用 Microsoft Graph API 获取收件箱未读邮件
    url = 'https://graph.microsoft.com/v1.0/me/mailFolders/Inbox/messages?$filter=isRead eq false&$count=true&$top=30'
    
    # ... (完整的邮件获取和处理逻辑)
    
    return mail_flag, get_mail_result, success_mail_list, unread_ids, access_token

1.3 创建依赖文件

创建 src/requirements.txt

requests==2.31.0
PyYAML==6.0.1
cron-validator==1.0.5

Step 2: 配置 Docker 容器

2.1 创建 Dockerfile

在项目根目录创建 Dockerfile

FROM python:3.9-slim

# 安装必要的系统依赖
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件并安装 Python 包
COPY src/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制源代码
COPY src/ .

# 设置环境变量
ENV PYTHONUNBUFFERED=1

# 暴露指标端口
EXPOSE 8080

# 默认执行命令
CMD ["python", "notification-monitor.py"]

2.2 构建和测试 Docker 镜像

# 构建镜像
docker build -t notification-monitor .

# 本地测试(替换为你的 API 配置)
docker run -e API_HOST="https://httpbin.org" \
           -e API_TOKEN="test-token" \
           -e EMAIL_ADDRESS="test@example.com" \
           notification-monitor

💡 验证成功:如果看到 “✅ 通知监控测试通过!” 说明 Docker 容器工作正常。

Step 3: 配置 Kubernetes 部署

3.1 创建部署配置

在项目根目录创建 deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: notification-monitor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: notification-monitor
  template:
    metadata:
      labels:
        app: notification-monitor
    spec:
      containers:
      - name: notification-monitor
        image: notification-monitor:latest
        ports:
        - containerPort: 8080
          name: http
          protocol: TCP
        env:
          - name: API_HOST
            value: "https://api.example.com"
          - name: EMAIL_ADDRESS
            value: "test@example.com"

3.2 部署到集群

# 构建并推送镜像到仓库
docker build -t your-registry/notification-monitor:latest .
docker push your-registry/notification-monitor:latest

# 更新 deployment.yaml 中的镜像地址
# 应用部署
kubectl apply -f deployment.yaml

🔒 安全提示:生产环境中应使用 Kubernetes Secrets 存储敏感信息,而不是硬编码在配置文件中。

Step 4: 集成 Prometheus 监控

4.1 添加指标上报功能

更新 src/notification-monitor.py,添加 Prometheus 指标服务器:
注:这里采取的是pull的方式添加Prometheus的metric,也可以使用push方式推送到Prometheus

def generate_prometheus_metrics():
    """生成 Prometheus 指标文本"""
    lines = []
    
    # API 成功指标
    metric_name = "notification_api_success_total"
    lines.append(f'# HELP {metric_name} Count of successful notification API calls')
    lines.append(f'# TYPE {metric_name} counter')
    lines.append(f'{metric_name} {global_success_counter.get("api", 0)}')
    
    # 邮件成功指标  
    metric_name = "notification_email_success_total"
    lines.append(f'# HELP {metric_name} Count of successful email deliveries')
    lines.append(f'# TYPE {metric_name} counter')
    lines.append(f'{metric_name} {global_email_success_counter.get("email", 0)}')
        
    return '\n'.join(lines) + '\n'

class MetricsHandler(BaseHTTPRequestHandler):
    """HTTP handler for /metrics endpoint"""
    
    def do_GET(self):
        if self.path == '/metrics':
            metrics_text = generate_prometheus_metrics()
            
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain; version=0.0.4')
            self.end_headers()
            self.wfile.write(metrics_text.encode('utf-8'))
        else:
            self.send_response(404)
            self.end_headers()

def start_metrics_server(port=8080):
    """启动 Prometheus 指标 HTTP 服务器"""
    server = HTTPServer(('0.0.0.0', port), MetricsHandler)
    thread = Thread(target=server.serve_forever, daemon=True)
    thread.start()
    logging.info(f"Prometheus 指标服务器启动在端口 {port}")
    return server

然后在主函数中更新全局计数器:

# 在发送通知后更新计数器
if api_success:
    global_success_counter["api"] = global_success_counter.get("api", 0) + 1

# 在检查邮件后更新计数器  
if email_success:
    global_email_success_counter["email"] = global_email_success_counter.get("email", 0) + 1

Step 5: 内置定时任务机制

5.1 定时任务实现

系统使用内置的 CronScheduler 实现定时任务,无需外部调度:

  • 定时表达式: */30 * * * * (每30分钟执行一次)
  • 主循环: 每分钟检查是否到达执行时间
  • 优势: 容器化部署后自动运行,无需额外配置

5.2 配置调整

如需调整执行频率,修改 notification-monitor.py 中的 cron 表达式:

# 每20分钟执行一次
scheduler = CronScheduler("*/20 * * * *")

# 每小时执行一次  
scheduler = CronScheduler("0 * * * *")

# 每天凌晨2点执行一次
scheduler = CronScheduler("0 2 * * *")

Step 6: 验证完整流程

现在让我们验证整个系统是否正常工作:

6.1 本地测试

# 1. 构建镜像
docker build -t notification-monitor .

# 2. 运行测试
docker run -e API_HOST="https://httpbin.org/post" \
           -e API_TOKEN="test-token" \
           -e EMAIL_ADDRESS="test@example.com" \
           -p 8080:8080 \
           notification-monitor

6.2 验证 Prometheus 指标

  1. 运行容器后,打开 http://localhost:8080/metrics
  2. 确认能看到 notification_api_success_totalnotification_email_success_total 指标
  3. 指标值应该反映测试结果

6.3 Kubernetes 部署测试

  1. 部署到集群:kubectl apply -f deployment.yaml
  2. 查看日志:kubectl logs -l app=notification-monitor
  3. 验证指标:kubectl port-forward <pod-name> 8080:8080 然后访问 http://localhost:8080/metrics

🚀 关键技术要点

邮箱认证方式

  • 使用 Outlook OAuth2.0: 通过 Microsoft Identity Platform 进行认证
  • Microsoft Graph API: 使用官方 API 获取和管理邮件
  • 安全凭证: 使用 refresh token 自动刷新 access token
  • 不再使用 SMTP: 避免密码认证的安全风险

定时任务机制

  • 内置调度器: 使用 cron-validator 库实现定时功能
  • 容器化友好: 部署后自动运行,无需外部 cron 服务
  • 灵活配置: 通过修改 cron 表达式调整执行频率
  • 资源高效: 低 CPU 占用,适合长时间运行

📊 常见问题解决

问题1: API 认证失败

解决方案: 检查 API token 是否正确,确认 API 端点是否可访问

问题2: 邮箱 OAuth2 认证失败

解决方案:

  • 确保 Outlook 账户启用了 OAuth2 认证
  • 检查 client_id、client_secret 和 refresh_token 是否正确
  • 确认应用在 Azure AD 中有 Mail.ReadWrite 权限

问题3: Docker 容器启动失败

解决方案: 检查 Dockerfile 中的依赖安装,确保网络连接正常

问题4: Prometheus 指标无法访问

解决方案: 确认容器的 8080 端口已正确映射,检查防火墙设置

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐