实时日志监控告警系统:从 Nginx 到邮件通知的完整链路搭建(初级版本,已上线阿里云)

目录

1. 项目简介

在现代互联网应用中,系统的稳定性与可观测性已成为保障用户体验的核心要素。然而,在实际运维过程中,我们常常面临以下挑战:

- 用户访问了不存在的页面(404),但无人知晓;
- 某个 IP 疯狂刷请求,可能正在扫描漏洞或进行 DDoS 攻击;
- 服务器宕机后重启,关键服务未能自动恢复,导致业务中断;
- 日志分散在各节点,无法统一分析,故障排查耗时长。

为了解决这些问题,我设计并搭建了一套**轻量级、全自动、实时响应的日志监控与告警系统**。该系统基于开源组件组合而成,无需商业软件,却能在异常发生时秒级发送邮件提醒,实现“**异常可见、风险可控、响应及时**”。

本项目采用微服务架构思想,构建了从用户请求 → 日志采集 → 消息队列 → 实时分析 → 多通道告警的完整链路。当前阶段聚焦于核心功能验证,后续将逐步引入 AI 查询、容器化部署等高级特性,打造企业级可观测性平台。

> 当前版本说明:本系统暂未引入 Celery 和 Redis 作为任务队列,所有逻辑由单个 Python 消费者脚本完成,结构简洁、易于理解,是后续升级的良好基础。

2. 整体架构图

在这里插入图片描述

3. 核心组件详解

3.1 Nginx

Nginx 在本系统中承担反向代理与负载均衡的核心角色,是用户请求进入后端服务的第一道关口。其主要职责包括:

  • 流量分发:将来自用户的 HTTP 请求均匀分配至后端多个 Web 应用实例(Flask),避免单点过载;
  • 高可用保障:通过配置健康检查机制,自动剔除异常节点,确保服务持续可用;
  • 日志标准化:统一记录所有访问日志(包含 IP、URL、状态码、响应时间等),为后续分析提供原始数据;
  • 性能优化:支持 Gzip 压缩、缓存静态资源,提升响应速度,降低后端压力。

在本项目中,Nginx 配置了 upstream 模块实现对后端 Flask 应用的负载均衡,并启用了 access_log 记录每条请求的详细信息。日志格式采用标准 combined 格式,便于 Filebeat 后续解析与处理。

技术选型说明:选择 Nginx 而非其他方案(如 HAProxy 或 LVS),是因为其轻量、高效、配置灵活,且具备强大的日志采集能力,完美契合本系统的“可观测性”需求。

3.2 Flask 应用

Flask 作为本系统的核心业务逻辑载体,负责处理用户请求并返回动态网页内容。虽然轻量,但其灵活性与可扩展性使其成为快速构建 Web 服务的理想选择。

在本项目中,Flask 应用部署于 /personal-website/app.py,主要实现以下功能:

  • 基础页面服务:提供主页、关于页等静态路由,模拟真实网站访问场景;
  • 404 异常路径设计:故意暴露无效 URL(如 /notfound),用于触发日志中的 404 错误,验证监控告警链路;
  • 监听全网卡地址:通过 app.run(host='0.0.0.0', port=5000) 确保 Nginx 可代理访问;
  • 结构化响应:所有请求均记录至 Nginx access log,为后续分析提供原始数据源。

为保障服务稳定性,Flask 应用通过 systemd 进行托管,配置开机自启与崩溃自动重启,确保系统重启后业务服务立即恢复。

技术选型说明:选用 Flask 而非 Django 或 FastAPI,是因为其极简架构更贴合本项目的“轻量级监控”定位,避免引入不必要的复杂性,同时便于与 Kafka 消费者解耦。
在这里插入图片描述

3.3 Filebeat

Filebeat 是本系统的日志采集器,负责实时监控 Nginx 的 access.log 文件,并将新增日志行发送至 Kafka。

其核心作用包括:

  • 轻量级、低资源占用,适合长期运行;
  • 支持日志解析与字段增强(如添加 log_type: nginx-access);
  • 保证日志不丢失:通过注册表(registry)记录读取位置,即使服务重启也不会重复或遗漏。

在本项目中,Filebeat 作为“桥梁”,将原始日志从服务器文件系统安全、高效地输送到 Kafka 消息队列,为后续实时分析奠定基础。

3.4 Kafka

Kafka 在本系统中扮演高吞吐、低延迟的日志消息总线角色,是连接日志采集(Filebeat)与实时分析(Python 消费者)的关键枢纽。

核心作用
  • 解耦生产与消费:Filebeat 只需将日志推入 Kafka,无需关心谁来处理;消费者可独立启停、扩容,互不影响;
  • 缓冲削峰:当消费者短暂宕机或处理变慢时,Kafka 持久化存储日志,避免数据丢失;
  • 顺序保障:同一 IP 或 URL 的访问日志在分区内保持顺序,便于后续行为分析;
  • 横向扩展:通过增加 Topic 分区数和消费者实例,可轻松应对流量增长。
部署与配置
  • 使用 Kafka 2.13 集群(3 节点),配合Kraft(无 ZooKeeper)模式 实现元数据管理与高可用;
  • 创建专用 Topic nginx-logs,分区数设为 3,副本因子为 2,兼顾性能与容灾;
  • 生产者(Filebeat)启用 acks=all,确保消息写入所有副本后才确认,提升可靠性;
  • 消费者采用手动提交偏移量(enable.auto.commit: false),避免因异常导致日志漏处理。

为什么选 Kafka?
相比 RabbitMQ 或 Redis Stream,Kafka 在高吞吐日志场景下表现更优,具备天然的持久化、回溯、多消费者支持能力,是构建可观测性系统的工业级标准选择。

3.5 Consumer

Python 消费者是本系统的分析引擎,负责从 Kafka 拉取 Nginx 访问日志,解析内容,并在检测到异常行为时触发告警。

其主要功能包括:
实时消费 nginx-logs 主题中的日志消息;
解析简化的 Nginx 日志格式(共 5 个字段:IP、时间、URL、状态码、User-Agent);
识别两类异常:
HTTP 状态码为 4xx 或 5xx;
单个 IP 在短时间内高频访问(≥5 次/分钟);
将原始日志及分析结果存入 MySQL;
触发邮件通知(使用 QQ 邮箱 SMTP)

import json
import time
import pymysql
import smtplib
from email.mime.text import MIMEText
from kafka import KafkaConsumer
from collections import defaultdict, deque

# 全局计数器:记录每个 IP 最近 60 秒内的请求时间戳
ip_requests = defaultdict(deque)

def send_email(subject, body):
    sender = 'your@qq.com'
    password = 'your_smtp_code'
    receiver = 'admin@example.com'

    msg = MIMEText(body, 'plain', 'utf-8')
    msg['Subject'] = subject
    msg['From'] = sender
    msg['To'] = receiver

    try:
        server = smtplib.SMTP_SSL('smtp.qq.com', 465)
        server.login(sender, password)
        server.sendmail(sender, [receiver], msg.as_string())
        server.quit()
    except Exception as e:
        print(f"[ERROR] 邮件发送失败: {e}")

def save_to_db(ip, url, status, ua, country='未知'):
    try:
        conn = pymysql.connect(
            host='localhost',
            user='root',
            password='your_password',
            database='monitor_db',
            charset='utf8'
        )
        cursor = conn.cursor()
        sql = "INSERT INTO access_log (ip, url, status_code, user_agent, country, created_at) VALUES (%s, %s, %s, %s, %s, NOW())"
        cursor.execute(sql, (ip, url, status, ua, country))
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"[ERROR] 数据库写入失败: {e}")

def is_high_frequency(ip):
    now = time.time()
    # 清理超过 60 秒的记录
    while ip_requests[ip] and ip_requests[ip][0] < now - 60:
        ip_requests[ip].popleft()
    ip_requests[ip].append(now)
    return len(ip_requests[ip]) >= 5

def parse_log(line):
    parts = line.strip().split(' ', 4)
    if len(parts) != 5:
        return None
    ip, _, url, status, ua = parts
    return ip, url, status, ua

consumer = KafkaConsumer(
    'nginx-logs',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='log-analyzer'
)

for msg in consumer:
    try:
        log_line = msg.value.decode('utf-8')
        parsed = parse_log(log_line)
        if not parsed:
            continue

        ip, url, status, ua = parsed
        status = int(status)

        # 存入数据库
        save_to_db(ip, url, status, ua)

        # 异常检测
        alert = False
        reason = ''

        if status >= 400:
            alert = True
            reason = f'发现 {status} 错误'

        if is_high_frequency(ip):
            alert = True
            reason = '高频访问行为'

        if alert:
            body = f"IP: {ip}\nURL: {url}\n状态码: {status}\nUser-Agent: {ua}\n原因: {reason}"
            send_email(f'[告警] {reason}', body)

        consumer.commit()

    except Exception as e:
        print(f"[ERROR] 处理消息失败: {e}")

该脚本通过 systemd 托管,配置开机自启与崩溃重启,确保 7×24 小时运行。所有输出日志由 journald 统一收集,可通过 journalctl -u consumer -f 实时查看。

4. 自动化与可靠性设计

4. 自动化与可靠性设计

为确保系统在生产环境中长期稳定运行,所有核心组件均通过 systemd 进行统一管理,实现开机自启、异常自动重启和日志集中查看。

每个服务均配置了以下关键属性:

  • Restart=always:进程退出后自动重启;
  • RestartSec=5:重启前等待 5 秒,避免频繁崩溃刷屏;
  • StandardOutput=journalStandardError=journal:输出统一由 journald 收集;
  • WantedBy=multi-user.target:随系统启动。

各服务列表如下:

组件 服务文件 启用命令
Nginx 系统自带 systemctl enable nginx
Flask 应用 /etc/systemd/system/flask-app.service systemctl enable flask-app
Filebeat 安装包自带 systemctl enable filebeat
Kafka /etc/systemd/system/kafka.service systemctl enable kafka
Python 消费者 /etc/systemd/system/consumer.service systemctl enable consumer

consumer.service 为例,其完整配置如下:

[Unit]
Description=Log Consumer Service
After=network.target kafka.service

[Service]
Type=simple
User=root
WorkingDirectory=/opt/log-monitor
ExecStart=/usr/bin/python3 /opt/log-monitor/consumer.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

5. 下一步演进方向

当前系统已实现基础的日志采集、异常检测与邮件告警能力。为进一步提升系统的扩展性、分析能力和运维效率,计划从以下几个方向进行升级:

引入 Redis 作为高频访问计数存储

当前使用内存字典记录 IP 请求频率,存在进程重启后数据丢失的问题。后续将改用 Redis 的 INCR 与过期时间机制,实现跨实例、持久化的流量统计。

集成 Celery 实现异步任务处理

将邮件发送、复杂日志分析等耗时操作交由 Celery 异步执行,避免阻塞 Kafka 消费主线程,提升吞吐量和稳定性。

支持多通道告警(钉钉、企业微信)

在现有邮件通知基础上,增加对国内常用办公 IM 的支持,满足不同团队的通知习惯。

构建 Web 可视化控制台

开发一个简单的前端页面,展示实时访问趋势、错误分布、告警记录等,替代目前依赖 Kibana 或命令行的方式。

容器化部署(Docker + Compose)

将所有组件打包为 Docker 镜像,通过 docker-compose.yml 统一编排,降低环境依赖问题,便于在测试/生产环境快速部署。

增加规则引擎支持

允许用户通过配置文件自定义告警规则,例如“当 /admin 路径被访问时立即告警”,而无需修改代码。

这些改进将在保持现有架构稳定的前提下逐步推进,目标是打造一个轻量但功能完整的可观测性平台。

6. 测试验证环节

为确保系统各模块功能正常、告警链路畅通,进行了以下关键场景的测试。每个测试项均包含操作步骤、预期结果与实际效果(截图见对应位置)。

6.1 404 错误触发邮件告警

  • 测试方法:使用浏览器或 curl 访问一个不存在的路径(如 /notfound);
  • 预期行为:Nginx 返回 404,Filebeat 采集日志,消费者识别状态码 ≥400,发送告警邮件;
  • 验证结果
    在这里插入图片描述

6.2 高频访问检测(高流量告警)

  • 测试方法:通过脚本在 1 分钟内对同一 URL 发起 ≥5 次请求;
  • 预期行为:消费者判定该 IP 为高频访问,触发“高频访问行为”告警;
  • 验证结果
  • 这里还没做好防火墙安全措施,被国外ip访问到了,因此要及时做好防护,昨天阿里云还被别人远程ssh登录了,后面在安全组里将22号端口改成不常用的端口号解决了问题
    (此处插入高频请求日志及告警邮件截图)

6.3 服务崩溃后自动恢复

  • 测试方法:手动 kill Kafka 或 consumer 进程;
  • 预期行为:systemd 在 5 秒内自动重启服务,日志消费不中断;
  • 验证结果
    这里最好用systemctl status kafka 来查看进程号,ps aux | grep kafka会呈现一大堆,难找
    (此处插入 journalctl -u consumer 显示重启记录的截图)

消费者自动重启:
在这里插入图片描述
kafka自动重启:
在这里插入图片描述

6.4 MySQL 日志持久化验证

  • 测试方法:发起若干正常与异常请求;
  • 预期行为:所有访问记录均写入 MySQL 表 nginx_access_logs
  • 验证结果
    在这里插入图片描述

7. 总结

本项目从实际运维痛点出发,设计并实现了一套轻量级、高可用的日志监控与告警系统。通过 Nginx、Filebeat、Kafka、Python 消费者和 MySQL 的有机组合,构建了完整的“采集—传输—分析—存储—告警”链路。

系统具备以下核心能力:

  • 实时捕获 HTTP 异常(如 404/500);
  • 自动识别高频访问行为,防范潜在攻击;
  • 服务崩溃或服务器重启后自动恢复;
  • 所有日志结构化存储,便于后续审计与分析。

整个方案未依赖 Celery、Redis 等复杂组件,仅使用开源工具和原生 Linux 能力,降低了部署门槛,同时保证了功能完整性与稳定性。代码简洁、架构清晰、测试闭环,适合中小型 Web 服务快速接入可观测性能力。

未来可通过引入异步任务、规则引擎、容器化等手段进一步演进,但当前版本已能独立支撑生产环境的基础监控需求。

本文为作者原创,未经授权禁止转载。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐