轻量级日志监控与告警系统(一)
实时日志监控告警系统:从 Nginx 到邮件通知的完整链路搭建(初级版本,已上线阿里云)
目录
- 1. 项目简介
- 2. 整体架构图
- 3. 核心组件详解
- 4. 自动化与可靠性设计
- 5. 下一步演进方向
- 6. 测试验证环节
- 7. 总结
项目库:https://gitcode.com/SUBENCAI/kafka-log-monitor
1. 项目简介
在现代互联网应用中,系统的稳定性与可观测性已成为保障用户体验的核心要素。然而,在实际运维过程中,我们常常面临以下挑战:
- 用户访问了不存在的页面(404),但无人知晓;
- 某个 IP 疯狂刷请求,可能正在扫描漏洞或进行 DDoS 攻击;
- 服务器宕机后重启,关键服务未能自动恢复,导致业务中断;
- 日志分散在各节点,无法统一分析,故障排查耗时长。
为了解决这些问题,我设计并搭建了一套**轻量级、全自动、实时响应的日志监控与告警系统**。该系统基于开源组件组合而成,无需商业软件,却能在异常发生时秒级发送邮件提醒,实现“**异常可见、风险可控、响应及时**”。
本项目采用微服务架构思想,构建了从用户请求 → 日志采集 → 消息队列 → 实时分析 → 多通道告警的完整链路。当前阶段聚焦于核心功能验证,后续将逐步引入 AI 查询、容器化部署等高级特性,打造企业级可观测性平台。
> 当前版本说明:本系统暂未引入 Celery 和 Redis 作为任务队列,所有逻辑由单个 Python 消费者脚本完成,结构简洁、易于理解,是后续升级的良好基础。
2. 整体架构图

3. 核心组件详解
3.1 Nginx
Nginx 在本系统中承担反向代理与负载均衡的核心角色,是用户请求进入后端服务的第一道关口。其主要职责包括:
- 流量分发:将来自用户的 HTTP 请求均匀分配至后端多个 Web 应用实例(Flask),避免单点过载;
- 高可用保障:通过配置健康检查机制,自动剔除异常节点,确保服务持续可用;
- 日志标准化:统一记录所有访问日志(包含 IP、URL、状态码、响应时间等),为后续分析提供原始数据;
- 性能优化:支持 Gzip 压缩、缓存静态资源,提升响应速度,降低后端压力。
在本项目中,Nginx 配置了 upstream 模块实现对后端 Flask 应用的负载均衡,并启用了 access_log 记录每条请求的详细信息。日志格式采用标准 combined 格式,便于 Filebeat 后续解析与处理。
技术选型说明:选择 Nginx 而非其他方案(如 HAProxy 或 LVS),是因为其轻量、高效、配置灵活,且具备强大的日志采集能力,完美契合本系统的“可观测性”需求。
3.2 Flask 应用
Flask 作为本系统的核心业务逻辑载体,负责处理用户请求并返回动态网页内容。虽然轻量,但其灵活性与可扩展性使其成为快速构建 Web 服务的理想选择。
在本项目中,Flask 应用部署于 /personal-website/app.py,主要实现以下功能:
- 基础页面服务:提供主页、关于页等静态路由,模拟真实网站访问场景;
- 404 异常路径设计:故意暴露无效 URL(如
/notfound),用于触发日志中的 404 错误,验证监控告警链路; - 监听全网卡地址:通过
app.run(host='0.0.0.0', port=5000)确保 Nginx 可代理访问; - 结构化响应:所有请求均记录至 Nginx access log,为后续分析提供原始数据源。
为保障服务稳定性,Flask 应用通过 systemd 进行托管,配置开机自启与崩溃自动重启,确保系统重启后业务服务立即恢复。
技术选型说明:选用 Flask 而非 Django 或 FastAPI,是因为其极简架构更贴合本项目的“轻量级监控”定位,避免引入不必要的复杂性,同时便于与 Kafka 消费者解耦。
3.3 Filebeat
Filebeat 是本系统的日志采集器,负责实时监控 Nginx 的 access.log 文件,并将新增日志行发送至 Kafka。
其核心作用包括:
- 轻量级、低资源占用,适合长期运行;
- 支持日志解析与字段增强(如添加
log_type: nginx-access); - 保证日志不丢失:通过注册表(registry)记录读取位置,即使服务重启也不会重复或遗漏。
在本项目中,Filebeat 作为“桥梁”,将原始日志从服务器文件系统安全、高效地输送到 Kafka 消息队列,为后续实时分析奠定基础。
3.4 Kafka
Kafka 在本系统中扮演高吞吐、低延迟的日志消息总线角色,是连接日志采集(Filebeat)与实时分析(Python 消费者)的关键枢纽。
核心作用
- 解耦生产与消费:Filebeat 只需将日志推入 Kafka,无需关心谁来处理;消费者可独立启停、扩容,互不影响;
- 缓冲削峰:当消费者短暂宕机或处理变慢时,Kafka 持久化存储日志,避免数据丢失;
- 顺序保障:同一 IP 或 URL 的访问日志在分区内保持顺序,便于后续行为分析;
- 横向扩展:通过增加 Topic 分区数和消费者实例,可轻松应对流量增长。
部署与配置
- 使用 Kafka 2.13 集群(3 节点),配合Kraft(无 ZooKeeper)模式 实现元数据管理与高可用;
- 创建专用 Topic
nginx-logs,分区数设为 3,副本因子为 2,兼顾性能与容灾; - 生产者(Filebeat)启用
acks=all,确保消息写入所有副本后才确认,提升可靠性; - 消费者采用手动提交偏移量(
enable.auto.commit: false),避免因异常导致日志漏处理。
为什么选 Kafka?
相比 RabbitMQ 或 Redis Stream,Kafka 在高吞吐日志场景下表现更优,具备天然的持久化、回溯、多消费者支持能力,是构建可观测性系统的工业级标准选择。
3.5 Consumer
Python 消费者是本系统的分析引擎,负责从 Kafka 拉取 Nginx 访问日志,解析内容,并在检测到异常行为时触发告警。
其主要功能包括:
实时消费 nginx-logs 主题中的日志消息;
解析简化的 Nginx 日志格式(共 5 个字段:IP、时间、URL、状态码、User-Agent);
识别两类异常:
HTTP 状态码为 4xx 或 5xx;
单个 IP 在短时间内高频访问(≥5 次/分钟);
将原始日志及分析结果存入 MySQL;
触发邮件通知(使用 QQ 邮箱 SMTP)
import json
import time
import pymysql
import smtplib
from email.mime.text import MIMEText
from kafka import KafkaConsumer
from collections import defaultdict, deque
# 全局计数器:记录每个 IP 最近 60 秒内的请求时间戳
ip_requests = defaultdict(deque)
def send_email(subject, body):
sender = 'your@qq.com'
password = 'your_smtp_code'
receiver = 'admin@example.com'
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
try:
server = smtplib.SMTP_SSL('smtp.qq.com', 465)
server.login(sender, password)
server.sendmail(sender, [receiver], msg.as_string())
server.quit()
except Exception as e:
print(f"[ERROR] 邮件发送失败: {e}")
def save_to_db(ip, url, status, ua, country='未知'):
try:
conn = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='monitor_db',
charset='utf8'
)
cursor = conn.cursor()
sql = "INSERT INTO access_log (ip, url, status_code, user_agent, country, created_at) VALUES (%s, %s, %s, %s, %s, NOW())"
cursor.execute(sql, (ip, url, status, ua, country))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"[ERROR] 数据库写入失败: {e}")
def is_high_frequency(ip):
now = time.time()
# 清理超过 60 秒的记录
while ip_requests[ip] and ip_requests[ip][0] < now - 60:
ip_requests[ip].popleft()
ip_requests[ip].append(now)
return len(ip_requests[ip]) >= 5
def parse_log(line):
parts = line.strip().split(' ', 4)
if len(parts) != 5:
return None
ip, _, url, status, ua = parts
return ip, url, status, ua
consumer = KafkaConsumer(
'nginx-logs',
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='log-analyzer'
)
for msg in consumer:
try:
log_line = msg.value.decode('utf-8')
parsed = parse_log(log_line)
if not parsed:
continue
ip, url, status, ua = parsed
status = int(status)
# 存入数据库
save_to_db(ip, url, status, ua)
# 异常检测
alert = False
reason = ''
if status >= 400:
alert = True
reason = f'发现 {status} 错误'
if is_high_frequency(ip):
alert = True
reason = '高频访问行为'
if alert:
body = f"IP: {ip}\nURL: {url}\n状态码: {status}\nUser-Agent: {ua}\n原因: {reason}"
send_email(f'[告警] {reason}', body)
consumer.commit()
except Exception as e:
print(f"[ERROR] 处理消息失败: {e}")
该脚本通过 systemd 托管,配置开机自启与崩溃重启,确保 7×24 小时运行。所有输出日志由 journald 统一收集,可通过 journalctl -u consumer -f 实时查看。
4. 自动化与可靠性设计
4. 自动化与可靠性设计
为确保系统在生产环境中长期稳定运行,所有核心组件均通过 systemd 进行统一管理,实现开机自启、异常自动重启和日志集中查看。
每个服务均配置了以下关键属性:
Restart=always:进程退出后自动重启;RestartSec=5:重启前等待 5 秒,避免频繁崩溃刷屏;StandardOutput=journal和StandardError=journal:输出统一由 journald 收集;WantedBy=multi-user.target:随系统启动。
各服务列表如下:
| 组件 | 服务文件 | 启用命令 |
|---|---|---|
| Nginx | 系统自带 | systemctl enable nginx |
| Flask 应用 | /etc/systemd/system/flask-app.service |
systemctl enable flask-app |
| Filebeat | 安装包自带 | systemctl enable filebeat |
| Kafka | /etc/systemd/system/kafka.service |
systemctl enable kafka |
| Python 消费者 | /etc/systemd/system/consumer.service |
systemctl enable consumer |
以 consumer.service 为例,其完整配置如下:
[Unit]
Description=Log Consumer Service
After=network.target kafka.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/log-monitor
ExecStart=/usr/bin/python3 /opt/log-monitor/consumer.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
5. 下一步演进方向
当前系统已实现基础的日志采集、异常检测与邮件告警能力。为进一步提升系统的扩展性、分析能力和运维效率,计划从以下几个方向进行升级:
引入 Redis 作为高频访问计数存储
当前使用内存字典记录 IP 请求频率,存在进程重启后数据丢失的问题。后续将改用 Redis 的 INCR 与过期时间机制,实现跨实例、持久化的流量统计。
集成 Celery 实现异步任务处理
将邮件发送、复杂日志分析等耗时操作交由 Celery 异步执行,避免阻塞 Kafka 消费主线程,提升吞吐量和稳定性。
支持多通道告警(钉钉、企业微信)
在现有邮件通知基础上,增加对国内常用办公 IM 的支持,满足不同团队的通知习惯。
构建 Web 可视化控制台
开发一个简单的前端页面,展示实时访问趋势、错误分布、告警记录等,替代目前依赖 Kibana 或命令行的方式。
容器化部署(Docker + Compose)
将所有组件打包为 Docker 镜像,通过 docker-compose.yml 统一编排,降低环境依赖问题,便于在测试/生产环境快速部署。
增加规则引擎支持
允许用户通过配置文件自定义告警规则,例如“当 /admin 路径被访问时立即告警”,而无需修改代码。
这些改进将在保持现有架构稳定的前提下逐步推进,目标是打造一个轻量但功能完整的可观测性平台。
6. 测试验证环节
为确保系统各模块功能正常、告警链路畅通,进行了以下关键场景的测试。每个测试项均包含操作步骤、预期结果与实际效果(截图见对应位置)。
6.1 404 错误触发邮件告警
- 测试方法:使用浏览器或 curl 访问一个不存在的路径(如
/notfound); - 预期行为:Nginx 返回 404,Filebeat 采集日志,消费者识别状态码 ≥400,发送告警邮件;
- 验证结果:

6.2 高频访问检测(高流量告警)
- 测试方法:通过脚本在 1 分钟内对同一 URL 发起 ≥5 次请求;
- 预期行为:消费者判定该 IP 为高频访问,触发“高频访问行为”告警;
- 验证结果:
- 这里还没做好防火墙安全措施,被国外ip访问到了,因此要及时做好防护,昨天阿里云还被别人远程ssh登录了,后面在安全组里将22号端口改成不常用的端口号解决了问题

6.3 服务崩溃后自动恢复
- 测试方法:手动 kill Kafka 或 consumer 进程;
- 预期行为:systemd 在 5 秒内自动重启服务,日志消费不中断;
- 验证结果:
这里最好用systemctl status kafka 来查看进程号,ps aux | grep kafka会呈现一大堆,难找
(此处插入journalctl -u consumer显示重启记录的截图)
消费者自动重启:
kafka自动重启:
6.4 MySQL 日志持久化验证
- 测试方法:发起若干正常与异常请求;
- 预期行为:所有访问记录均写入 MySQL 表
nginx_access_logs; - 验证结果:

7. 总结
本项目从实际运维痛点出发,设计并实现了一套轻量级、高可用的日志监控与告警系统。通过 Nginx、Filebeat、Kafka、Python 消费者和 MySQL 的有机组合,构建了完整的“采集—传输—分析—存储—告警”链路。
系统具备以下核心能力:
- 实时捕获 HTTP 异常(如 404/500);
- 自动识别高频访问行为,防范潜在攻击;
- 服务崩溃或服务器重启后自动恢复;
- 所有日志结构化存储,便于后续审计与分析。
整个方案未依赖 Celery、Redis 等复杂组件,仅使用开源工具和原生 Linux 能力,降低了部署门槛,同时保证了功能完整性与稳定性。代码简洁、架构清晰、测试闭环,适合中小型 Web 服务快速接入可观测性能力。
未来可通过引入异步任务、规则引擎、容器化等手段进一步演进,但当前版本已能独立支撑生产环境的基础监控需求。
本文为作者原创,未经授权禁止转载。
更多推荐





所有评论(0)