以AI量化为生:6.日志系统与告警机制设计
本文介绍了量化交易系统中日志管理和告警通知系统的设计实现。作者基于loguru构建了异步高性能日志系统,支持自动轮转、压缩存储和彩色输出。告警系统整合了飞书/钉钉机器人,采用多线程异步发送机制,具备失败重试和频率限制功能。文章详细展示了日志格式设计、告警机器人配置、核心代码实现及系统集成方案,并提供了测试验证方法和常见问题解决方案。该系统可有效监控量化交易运行状态,及时发现异常情况,为后续策略开发
本文是《以AI量化为生》系列的第六篇,我们将设计一套完整的日志管理和告警通知系统,确保量化交易系统的稳定运行和及时监控。
前言
在前面的文章中,我们已经搭建了完整的数据下载系统,现在系统可以自动获取期货数据了。但是有个问题:系统跑起来后,我们怎么知道它运行得怎么样?
刚开始做量化的时候,我经常遇到这样的情况:
-
数据下载失败了,但我不知道
-
策略出现异常,但没有及时发现
-
系统运行了几天,才发现某个环节出了问题
这就是为什么我们需要一套完善的日志系统和告警机制。今天我们要解决的问题:
-
日志管理:如何记录系统运行的详细信息?
-
异步写入:如何确保日志不影响主线程性能?
-
告警通知:如何在出现问题时及时通知?
-
多渠道通知:如何支持钉钉、飞书等多种通知方式?
一、日志系统设计思路
1.1 为什么选择loguru
Python有很多日志库,为什么我选择loguru?主要有几个原因:
传统logging的痛点:
-
配置复杂,需要设置Handler、Formatter等
-
多线程环境下容易出问题
-
异步写入需要额外配置
-
日志轮转配置繁琐
loguru的优势:
-
开箱即用,配置简单
-
天然支持异步写入(enqueue=True)
-
内置日志轮转和压缩
-
支持结构化日志
-
异常堆栈信息更友好
1.2 日志系统架构
我们的日志系统采用以下架构:
应用程序 -> loguru -> 内存缓冲区 -> 异步写入 -> 文件/控制台
|
-> 告警触发器 -> 钉钉/飞书
核心特性:
-
异步写入:使用enqueue=True,避免IO阻塞主线程
-
内存缓冲:使用buffering参数,减少磁盘IO次数
-
日志轮转:按时间自动轮转,压缩历史文件
-
多级别输出:控制台和文件可以设置不同的日志级别
-
符号标识:每个模块可以有独特的symbol标识
1.3 日志格式设计
我设计的日志格式包含以下信息:
<时间戳> | <日志级别> | <symbol标识>|<消息内容> | <函数名>:<行号>
示例:
25-01-06 14:30:15 | INFO | RB|数据下载完成,共2016条 | download_data:156
25-01-06 14:30:16 | ERROR | CU|网络连接失败,开始重试 | retry_download:89
25-01-06 14:30:17 | SUCCESS | SYS|告警消息发送成功 | send_alert:234
这样的格式有几个好处:
-
时间戳方便追踪问题发生时间
-
symbol标识可以快速定位是哪个品种或模块
-
函数名和行号方便代码调试
-
不同级别用不同颜色显示,控制台输出更清晰
二、日志系统核心实现
让我们开始实现这个日志系统。首先创建核心的日志管理器:
2.1 创建日志管理器
我先在core/logging
目录下创建日志管理器:
# core/logging/logger_manager.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日志管理器
基于loguru实现的高性能异步日志系统
"""
import os
import sys
import atexit
from datetime import datetime, time
from pathlib import Path
from typing import Optional
from loguru import logger
class LoggerManager:
"""日志管理器"""
def __init__(self):
self.base_path = Path(__file__).parent.parent.parent
self.log_path = self.base_path / "logs"
self.project_name = "atmquant"
self._initialized = False
self._log_handlers = []
def setup_logger(
self,
log_name: str = None,
level: str = "DEBUG",
output_console: bool = True,
symbol: str = ""
) -> None:
"""设置日志系统"""
if self._initialized:
return
try:
# 确保日志目录存在
ifnot self.log_path.exists():
print(f'日志文件夹不存在:{self.log_path},创建!')
self.log_path.mkdir(parents=True, exist_ok=True)
# 使用默认日志名
if log_name isNone:
log_name = self.project_name
# 日志文件完整路径
log_file = self.log_path / f"{log_name}.log"
# 定义日志格式,包含时间戳、日志级别、symbol标识、消息内容和代码位置
format_str = (
"<dim>{time:YY-MM-DD HH:mm:ss}</dim> | "
"<level>{level: <8}</level> | "
"<level>" + symbol + "|{message}</level> | "
"{function}:{line}"
)
# 移除默认处理器
logger.remove()
# 设置不同日志级别的颜色
logger.level("TRACE", color="<dim>")
logger.level("DEBUG", color="<blue>")
logger.level("INFO", color="<yellow>")
logger.level("SUCCESS", color="<green>")
logger.level("WARNING", color="<cyan>")
logger.level("ERROR", color="<magenta>")
logger.level("CRITICAL", color="<red>")
# 添加文件日志处理器,使用优化配置
file_handler_id = logger.add(
str(log_file), # 使用完整路径
format=format_str,
rotation=time(3, 0, 0), # 每天凌晨3点轮转
retention="30 days", # 保留30天的日志
compression="zip", # 压缩历史日志
level=level,
enqueue=True, # 启用异步写入,避免IO阻塞
catch=True, # 捕获异常
delay=False, # 立即创建文件
buffering=1024 * 32, # 使用32KB缓冲区,减少IO操作
encoding="utf-8", # 明确指定编码
backtrace=True, # 异常时显示完整堆栈
diagnose=True, # 显示变量值
)
self._log_handlers.append(file_handler_id)
# 添加控制台输出处理器
if output_console:
console_handler_id = logger.add(
sink=sys.stderr,
format=format_str,
colorize=True,
level=level,
enqueue=True, # 启用异步输出
catch=True,
backtrace=True,
diagnose=True
)
self._log_handlers.append(console_handler_id)
# 注册程序退出时的清理函数
atexit.register(self._cleanup)
self._initialized = True
logger.info(f"日志系统初始化完成,日志文件:{log_file}")
except Exception as e:
print(f"无法创建日志文件夹或设置日志:{e}")
raise
def _cleanup(self):
"""程序退出时的清理函数"""
try:
logger.info("程序正在关闭,等待日志写入完成...")
logger.complete()
logger.info("日志系统已安全关闭")
except Exception as e:
print(f"关闭日志系统时发生错误: {e}")
这个日志管理器的核心特性:
异步写入:enqueue=True
确保日志写入不会阻塞主线程内存缓冲:buffering=1024*32
使用32KB缓冲区,减少磁盘IO自动轮转:每天凌晨3点自动轮转日志文件压缩存储:历史日志自动压缩为zip格式异常捕获:catch=True
确保日志系统本身的异常不会影响主程序
2.2 日志系统的关键优化
1. 轮转时间的考虑
rotation=time(3, 0, 0) # 凌晨3点轮转
为什么选择凌晨3点?因为这个时间段通常是交易系统最空闲的时候,期货夜场品种也全部结束,轮转操作不会影响正常交易。
2. 日志级别的颜色设置
我重新定义了日志级别的颜色,让控制台输出更清晰:
-
SUCCESS用绿色(成功操作)
-
ERROR用紫色(错误信息)
-
WARNING用青色(警告信息)
三、告警机器人配置
在实现告警系统之前,我们需要先创建飞书和钉钉机器人。
3.1 创建飞书机器人
步骤1:创建飞书群组
首先需要创建一个飞书群组用于接收告警消息:
飞书客户端界面
-
打开飞书客户端
-
点击左侧的"+"号,选择"创建群组"
-
添加需要接收告警的成员(建议只添加相关人员)
-
给群组起一个有意义的名字,如"量化交易告警群"
步骤2:添加自定义机器人
在群组中添加机器人:
飞书群聊界面
-
在群聊界面,点击右上角的"设置"图标
-
选择"群设置"
-
找到"机器人"选项,点击"添加机器人"
-
选择"自定义机器人"
-
填写机器人信息:
-
机器人名称:如"ATMQuant告警机器人"
-
描述:如"用于量化交易系统告警通知"
-
添加自定义机器人界面
步骤3:配置安全设置
为了安全,建议启用签名验证:
机器人配置界面
-
在机器人配置页面,找到"安全设置"
-
选择"签名校验"方式
-
系统会生成一个密钥(Secret),请妥善保存
-
点击"完成"
步骤4:获取配置信息
完成配置后,你会得到:
-
Webhook地址:格式如
https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
-
密钥:一串随机字符串
步骤5:测试飞书机器人
可以用curl命令测试一下:
# 替换为你的实际配置
WEBHOOK_URL="https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-id"
SECRET="your-secret"
TIMESTAMP=$(date +%s)
# 生成签名
STRING_TO_SIGN="${TIMESTAMP}\n${SECRET}"
SIGNATURE=$(echo -n "$STRING_TO_SIGN" | openssl dgst -sha256 -hmac "" -binary | base64)
# 发送测试消息
curl -X POST "$WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "{
\"timestamp\": $TIMESTAMP,
\"sign\": \"$SIGNATURE\",
\"msg_type\": \"text\",
\"content\": {
\"text\": \"🤖 飞书机器人测试消息\\n时间:$(date)\\n状态:正常\"
}
}"
如果返回 {"msg":"success"}
就说明配置成功了。
3.2 创建钉钉机器人
步骤1:创建钉钉群组
钉钉客户端界面
-
打开钉钉客户端
-
点击右上角的"+"号
-
选择"发起群聊"
-
添加需要接收告警的成员
-
创建群聊,命名为"量化交易告警群"
步骤2:添加自定义机器人
在群组中添加机器人:
-
进入群聊,点击右上角的群设置图标
-
下拉到"群管理"设置
-
点击"添加机器人"
-
选择"自定义"机器人
-
点击"添加"
步骤3:配置机器人信息
添加机器人界面
-
机器人名字:如"ATMQuant告警机器人"
-
安全设置:建议选择"加签"方式
-
系统会生成一个密钥,请保存好
-
-
点击"完成"
步骤4:获取配置信息
完成配置后,你会得到:
-
Webhook地址:格式如
https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-
密钥:用于签名验证的字符串
步骤5:测试钉钉机器人
# 替换为你的实际配置
ACCESS_TOKEN="your-access-token"
SECRET="your-secret"
TIMESTAMP=$(date +%s)000 # 钉钉使用毫秒时间戳
# 生成签名
STRING_TO_SIGN="${TIMESTAMP}\n${SECRET}"
SIGNATURE=$(echo -n "$STRING_TO_SIGN" | openssl dgst -sha256 -hmac "$SECRET" -binary | base64)
# URL编码签名
ENCODED_SIGNATURE=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$SIGNATURE'))")
# 构造完整URL
WEBHOOK_URL="https://oapi.dingtalk.com/robot/send?access_token=${ACCESS_TOKEN}×tamp=${TIMESTAMP}&sign=${ENCODED_SIGNATURE}"
# 发送测试消息
curl -X POST "$WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "{
\"msgtype\": \"text\",
\"text\": {
\"content\": \"🤖 钉钉机器人测试消息\\n时间:$(date)\\n状态:正常\"
}
}"
如果返回 {"errcode":0,"errmsg":"ok"}
就说明配置成功了。
3.3 配置环境变量
将获取到的配置信息保存到.env
文件中:
# 飞书机器人配置
FEISHU_DEFAULT_WEBHOOK=https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-id
FEISHU_DEFAULT_SECRET=your-feishu-secret
# 钉钉机器人配置
DINGTALK_DEFAULT_WEBHOOK=https://oapi.dingtalk.com/robot/send?access_token=your-token
DINGTALK_DEFAULT_SECRET=your-dingtalk-secret
记住,这些敏感信息千万不要提交到代码仓库!
四、告警系统实现
现在我们有了机器人,接下来实现告警系统。
4.1 告警管理器核心实现
# core/logging/alert_manager.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
告警管理器
支持钉钉、飞书等多种通知方式的异步告警系统
"""
import traceback
import requests
import json
import hashlib
import base64
import hmac
import time
import re
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Dict, Any
from .logger_manager import get_logger
# 创建线程池执行器,用于异步发送告警
executor = ThreadPoolExecutor(max_workers=5)
logger = get_logger(symbol="ALERT")
class AlertManager:
"""告警管理器"""
def __init__(self):
self.symbol_hook_map = {} # 品种到webhook映射
self.hook_secret_map = {} # webhook到密钥映射
self.default_feishu_url = ""
self.default_feishu_secret = ""
self.default_dingtalk_url = ""
self.default_dingtalk_secret = ""
def send_feishu_alert(
self,
content: str,
symbol: Optional[str] = None,
now: Optional[datetime] = None,
force_send: bool = False
):
"""发送飞书告警消息"""
if now isNone:
now = datetime.now()
# 提取品种代码
commodity_code = self._extract_commodity_code(symbol) if symbol elseNone
def send_message():
"""异步发送消息的内部函数"""
try:
# 检查是否应该发送
ifnot self._should_send_alert(now, force_send):
logger.debug(f"跳过发送告警消息:{content[:50]}...")
return
# 选择webhook和密钥
webhook_url = self.symbol_hook_map.get(
commodity_code,
self.default_feishu_url
)
secret = self.hook_secret_map.get(
commodity_code,
self.default_feishu_secret
)
ifnot webhook_url ornot secret:
logger.error("飞书webhook或密钥未配置")
return
# 生成签名
timestamp = int(time.time())
signature = self._generate_feishu_signature(timestamp, secret)
# 构造消息
headers = {
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"timestamp": timestamp,
"sign": signature,
"msg_type": "text",
"content": {
"text": content
}
}
# 发送请求,最多重试3次
for attempt in range(3):
try:
response = requests.post(
url=webhook_url,
data=json.dumps(payload),
headers=headers,
timeout=10
)
result = response.json()
logger.debug(f"飞书告警响应: {result}")
if result.get('msg') == 'success':
logger.success(f"飞书告警发送成功: {content[:50]}...")
break
else:
logger.warning(f"飞书告警发送失败: {result}")
except Exception as e:
logger.error(f"飞书告警发送异常 (尝试 {attempt + 1}/3): {e}")
if attempt < 2: # 不是最后一次尝试
time.sleep(5)
continue
# 每处理完一个任务后等待1秒钟
time.sleep(1)
except Exception as e:
logger.error(f"飞书告警处理异常: {e}")
logger.error(traceback.format_exc())
# 提交到线程池异步执行
executor.submit(send_message)
4.2 告警系统的关键设计
异步执行:使用ThreadPoolExecutor
确保告警发送不会阻塞主线程重试机制:网络请求失败时自动重试3次,每次间隔5秒时间控制:周末和非交易时间自动静音品种分组:不同品种可以发送到不同的群组
4.3 配置管理系统
为了方便管理各种配置,我创建了专门的配置文件:
# config/alert_config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
告警配置管理
支持飞书、钉钉等多种告警方式的配置
"""
import os
from typing import Dict, Optional
from pathlib import Path
# 飞书配置
FEISHU_CONFIG = {
# 默认飞书webhook配置
"default_webhook": os.getenv("FEISHU_DEFAULT_WEBHOOK", ""),
"default_secret": os.getenv("FEISHU_DEFAULT_SECRET", ""),
# 品种特定的webhook映射(可选)
"symbol_webhook_map": {
# 示例:不同品种可以发送到不同的群
# "rb": "https://open.feishu.cn/open-apis/bot/v2/hook/your-rb-webhook",
# "cu": "https://open.feishu.cn/open-apis/bot/v2/hook/your-cu-webhook",
},
# webhook对应的密钥映射
"webhook_secret_map": {
# 示例:每个webhook对应的密钥
# "https://open.feishu.cn/open-apis/bot/v2/hook/your-rb-webhook": "your-rb-secret",
}
}
# 告警配置
ALERT_CONFIG = {
# 启用的告警类型
"enabled_types": ["feishu"], # 可选: "feishu", "dingtalk", "email"
# 默认告警类型
"default_type": "feishu",
# 告警级别配置
"alert_levels": {
"ERROR": True, # 错误级别告警
"CRITICAL": True, # 严重错误告警
"WARNING": False, # 警告级别告警(默认不发送)
"SUCCESS": False, # 成功消息(默认不发送)
},
# 时间限制配置
"time_restrictions": {
"enabled": True, # 是否启用时间限制
"weekend_silence": True, # 周末是否静音
"silence_start_hour": 3, # 周六静音开始时间(小时)
}
}
这个配置系统的好处:
-
环境变量支持:敏感信息通过环境变量配置
-
灵活映射:不同品种可以发送到不同群组
-
级别控制:可以控制哪些级别的消息需要告警
-
时间管理:自动处理交易时间和静音时间
五、系统集成与使用示例
现在我们已经完成了日志和告警系统的核心实现,让我们看看如何在实际项目中使用它们。
5.1 环境变量配置
首先,我们需要在.env
文件中配置告警相关的环境变量:
# .env
# 飞书告警配置
FEISHU_DEFAULT_WEBHOOK=https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-id
FEISHU_DEFAULT_SECRET=your-feishu-secret
# 钉钉告警配置(可选)
DINGTALK_DEFAULT_WEBHOOK=https://oapi.dingtalk.com/robot/send?access_token=your-token
DINGTALK_DEFAULT_SECRET=your-dingtalk-secret
记住,这些敏感信息千万不要提交到代码仓库!
5.2 在数据下载模块中使用
让我们看看如何在之前的数据下载模块中集成日志和告警系统:
# core/data/downloader.py
from core.logging.logger_manager import get_logger
from core.logging.alert_manager import alert_manager
from config.alert_config import get_alert_config
# 初始化日志系统
logger = get_logger(symbol="DATA")
# 配置告警系统
alert_config = get_alert_config()
alert_manager.configure_feishu(
default_url=alert_config["feishu"]["default_webhook"],
default_secret=alert_config["feishu"]["default_secret"]
)
class FutureDataDownloader:
def __init__(self):
self.logger = logger
def download_data(self, symbol: str):
"""下载期货数据"""
try:
self.logger.info(f"开始下载 {symbol} 数据")
# 模拟数据下载过程
data = self._fetch_data(symbol)
if data isnotNone:
self.logger.success(f"{symbol} 数据下载完成,共 {len(data)} 条记录")
return data
else:
error_msg = f"{symbol} 数据下载失败:无数据返回"
self.logger.error(error_msg)
# 发送告警
alert_manager.send_alert(
content=f"⚠️ 数据下载异常\n品种:{symbol}\n错误:无数据返回\n时间:{datetime.now()}",
symbol=symbol,
alert_type="feishu"
)
returnNone
except Exception as e:
error_msg = f"{symbol} 数据下载异常:{str(e)}"
self.logger.error(error_msg)
# 发送告警
alert_manager.send_alert(
content=f"🚨 数据下载严重异常\n品种:{symbol}\n错误:{str(e)}\n时间:{datetime.now()}",
symbol=symbol,
alert_type="feishu",
force_send=True# 严重错误强制发送
)
returnNone
def _fetch_data(self, symbol: str):
"""实际的数据获取逻辑"""
# 这里是具体的数据获取实现
pass
5.3 验证系统是否正常工作
我创建了一个快速测试脚本,让你可以验证整个系统:
# quick_test.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ATMQuant 系统快速测试脚本
"""
import os
import time
from datetime import datetime
from core.logging import get_logger, alert_manager
from config.alert_config import get_alert_config
def test_logging_system():
"""测试日志系统"""
print("📝 测试日志系统...")
# 创建测试日志
logger = get_logger(symbol="TEST")
logger.info("日志系统测试开始")
logger.success("这是一条成功消息")
logger.warning("这是一条警告消息")
logger.error("这是一条错误消息")
print("✅ 日志系统工作正常")
def test_alert_system():
"""测试告警系统"""
print("📱 测试告警系统...")
try:
alert_config = get_alert_config()
# 检查配置
feishu_webhook = alert_config["feishu"]["default_webhook"]
feishu_secret = alert_config["feishu"]["default_secret"]
ifnot feishu_webhook ornot feishu_secret:
print("⚠️ 飞书配置未设置,跳过告警测试")
return
# 配置告警系统
alert_manager.configure_feishu(
default_url=feishu_webhook,
default_secret=feishu_secret
)
# 发送测试告警
alert_manager.send_alert(
content=f"🤖 系统测试消息\n时间:{datetime.now()}\n状态:配置正常",
symbol="TEST",
alert_type="feishu",
force_send=True
)
print("✅ 告警消息已发送,请检查飞书群")
except Exception as e:
print(f"❌ 告警测试失败:{e}")
if __name__ == "__main__":
print("🚀 ATMQuant 系统快速测试")
print("=" * 50)
test_logging_system()
test_alert_system()
# 等待异步任务完成
time.sleep(3)
print("✅ 测试完成!")
运行测试:
python quick_test.py
你会看到类似这样的输出:
🚀 ATMQuant 系统快速测试
==================================================
📝 测试日志系统...
25-01-06 15:30:15 | INFO | TEST|日志系统测试开始 | test_logging_system:15
25-01-06 15:30:15 | SUCCESS | TEST|这是一条成功消息 | test_logging_system:16
25-01-06 15:30:15 | WARNING | TEST|这是一条警告消息 | test_logging_system:17
25-01-06 15:30:15 | ERROR | TEST|这是一条错误消息 | test_logging_system:18
✅ 日志系统工作正常
📱 测试告警系统...
✅ 告警消息已发送,请检查飞书群
✅ 测试完成!
如果你看到这样的输出,并且飞书群里收到了测试消息,说明系统配置成功了!
六、可能遇到的问题和解决方案
在实际使用过程中,你可能会遇到一些问题,让我分享一下解决方案:
问题1:日志文件过大
现象:日志文件增长很快,占用大量磁盘空间
解决方案:
# 调整日志轮转和保留策略
logger.add(
log_file,
rotation="100 MB", # 文件大小达到100MB时轮转
retention="7 days", # 只保留7天的日志
compression="zip" # 压缩历史文件
)
问题2:告警消息过多
现象:系统频繁发送告警,造成消息轰炸
解决方案:
# 在告警管理器中添加频率限制
class AlertManager:
def __init__(self):
self._last_alert_time = {}
self._alert_interval = 300# 5分钟内同类告警只发送一次
def _should_send_alert_by_frequency(self, content_hash: str) -> bool:
"""检查告警频率限制"""
now = time.time()
last_time = self._last_alert_time.get(content_hash, 0)
if now - last_time > self._alert_interval:
self._last_alert_time[content_hash] = now
returnTrue
returnFalse
问题3:飞书/钉钉配置错误
现象:告警发送失败,返回签名错误
解决方案:
-
检查webhook地址是否正确
-
确认密钥配置无误
-
验证时间戳生成逻辑
# 调试签名生成过程
def debug_signature(timestamp, secret):
string_to_sign = f'{timestamp}\n{secret}'
print(f"待签名字符串: {string_to_sign}")
hmac_code = hmac.new(
string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
print(f"生成的签名: {sign}")
return sign
七、实际开发建议
基于我的实际使用经验,给大家几个建议:
1. 日志级别的合理使用
-
DEBUG:详细的调试信息,生产环境建议关闭
-
INFO:一般的运行信息,如系统启动、数据下载完成等
-
SUCCESS:重要的成功操作,如策略信号生成、订单成交等
-
WARNING:需要注意但不影响运行的问题,如数据延迟、网络重试等
-
ERROR:影响功能的错误,如数据获取失败、计算异常等
-
CRITICAL:严重错误,如系统崩溃、资源耗尽等
2. 告警策略的设计
不要什么都告警,这样会造成"狼来了"效应。建议:
-
ERROR级别:发送告警,但有频率限制
-
CRITICAL级别:立即发送告警,无频率限制
-
SUCCESS级别:重要操作可以发送通知
-
WARNING级别:一般不发送告警,除非是关键业务
3. 性能优化建议
-
使用异步写入(enqueue=True)
-
合理设置缓冲区大小
-
定期清理历史日志
-
监控日志系统本身的性能
4. 安全注意事项
-
敏感信息不要记录到日志中
-
webhook地址和密钥通过环境变量配置
-
定期轮换告警机器人的密钥
-
限制日志文件的访问权限
八、下一步计划
在下一篇文章中,我们将开始编写自己的第一个量化策略。有了完善的日志和告警机制,我们就能更好地监控策略的运行状态,及时发现问题。
我们将涵盖:
-
策略开发基础:vnpy策略框架的核心概念
-
第一个策略:从零开始编写一个简单的均线策略
-
策略集成:如何将日志和告警系统集成到策略中
-
策略测试:如何验证策略逻辑的正确性
-
实盘准备:策略上线前的检查清单
通过实际编写策略,你将学会:
-
如何使用vnpy的策略模板
-
如何处理市场数据和交易信号
-
如何进行风险控制和资金管理
-
如何利用我们已经搭建的基础设施
这将是我们从基础设施建设转向实际策略开发的重要一步!
本文是《以AI量化为生》系列文章的第六篇,完整代码已开源至GitHub:https://github.com/seasonstar/atmquant
本文内容仅供学习交流,不构成任何投资建议。交易有风险,投资需谨慎。
更多推荐
所有评论(0)