从crontab乱麻到稳定调度:Python爬虫+Airflow搭建电商价格监控平台实战全记录
摘要:本文分享了从crontab迁移到Airflow实现电商竞品价格监控的实战经验。作者针对crontab在任务依赖、失败告警和日志管理等方面的痛点,选择Airflow搭建调度平台,并详细介绍了四层架构设计(爬虫层、存储层、调度层、告警层)。重点讲解了Airflow DAG的依赖调度实现,包括环境搭建、任务编排和异常处理,最终实现稳定监控近千个商品价格的系统。文章提供了可落地的技术方案和避坑指南,
大家好,我是威哥,搞爬虫和数据平台也有六七年了。前两年帮朋友的电商公司做竞品价格监控,一开始图省事,直接用crontab跑了七八个Python脚本——刚开始监控十几个商品还行,后来品类扩展到五百多个,脚本之间还得有依赖:得先爬取品类列表页拿到最新商品ID,才能去爬详情页和价格,crontab根本管不了这些依赖,经常出现列表还没更新,详情页脚本就先跑了,报错报得我头大。更糟的是失败了没告警,等朋友发现竞品价格没更新,都过了大半天,错过了调价窗口。
痛定思痛,决定换个正经的调度工具,对比了Azkaban、Oozie,最后选了Airflow——主要是看中它用Python写DAG,对我们爬虫党友好,可视化界面也清晰。折腾了两周踩了无数坑,终于搭了个稳定的平台,现在监控近千个商品,跑了一年多没出过大问题。今天就把这套架构从0到1的实现过程分享给大家,全是实战踩出来的干货。
一、为什么放弃crontab选Airflow?先聊聊我的痛点
先说说crontab的局限,相信很多做过定时爬虫的朋友都有共鸣:
- 依赖调度完全靠猜:脚本A要等脚本B跑完才能跑,只能靠估算时间差,比如脚本B设成0点,脚本A设成0点10分——但脚本B有时候跑5分钟,有时候跑20分钟,根本不靠谱;
- 失败了没人知道:crontab跑脚本失败了,除非你去看服务器日志,不然根本发现不了,等发现的时候数据已经断了好几天;
- 日志散在各处:每个脚本自己写日志文件,查问题的时候要翻好几个目录,效率极低;
- 任务多了管理乱:几十个crontab任务堆在那里,想暂停某个任务、改个时间都得小心翼翼,生怕删错了。
再说说Airflow解决了这些问题的核心优势:
- 可视化DAG依赖:用Python代码定义任务依赖,谁先谁后一目了然,Web界面上还能看到任务的执行顺序和状态;
- 任务重试与告警:任务失败了自动重试,还能发邮件、钉钉告警,不用再盯着日志看;
- 集中式日志管理:所有任务的日志都存在Airflow里,Web界面直接就能查,不用登服务器;
- 灵活的调度规则:不仅能定时间(比如每小时、每天),还能定依赖(比如等另一个DAG跑完再跑),甚至能根据数据是否存在来触发任务。
二、整体架构设计:四个核心模块,小团队也能落地
这套架构没搞太复杂,适合小团队快速落地,核心分为四个模块:
1. 架构总览
- 爬虫层:负责爬取电商平台的商品信息、价格、库存,用Python+requests/Playwright实现,搭配代理池处理反爬;
- 存储层:MySQL存商品基础信息和价格历史数据,Redis做缓存和去重(比如判断商品价格是否变化,避免重复写入);
- 调度层:Airflow负责定时触发爬虫任务、管理任务依赖、监控任务状态,用CeleryExecutor做并发调度;
- 告警层:任务失败或价格异常波动时,用钉钉/邮件发通知给运营人员。
2. 为什么选这些技术?
- 爬虫:requests处理简单接口,Playwright处理需要JS渲染的页面,代理池用之前写的开源方案,稳定够用;
- 存储:MySQL存结构化数据方便查询和做价格趋势分析,Redis做缓存快,去重也方便(用
set存已爬的商品ID); - 调度:Airflow的Python DAG对我们太友好了,不用学新的配置语言,CeleryExecutor解决并发问题,小团队用Redis做Broker足够;
- 告警:钉钉机器人配置简单,运营人员也习惯看钉钉,邮件作为兜底。
三、核心模块实现:从爬虫到Airflow调度,一步步讲
(一)爬虫模块:先把数据爬下来,处理好反爬
爬虫不是今天的重点,但还是简单说下核心逻辑,毕竟调度的前提是有稳定的爬虫:
- 反爬处理:用代理池(每3个请求换一个IP)、Playwright模拟真实浏览器(隐藏自动化特征)、随机请求头,基本能搞定大部分电商的反爬;
- 数据清洗:爬下来的价格是字符串(比如“¥199.00”),转成float类型,商品名称去掉多余空格,库存转成整数;
- 去重逻辑:Redis里存“商品ID-最新价格”的键值对,每次爬取先对比Redis里的价格,如果没变就不写MySQL,只更新Redis的时间戳,减少数据库压力;
- MySQL表结构设计:
product_info表:存商品基础信息(商品ID、名称、URL、品类、更新时间);price_history表:存价格历史(ID、商品ID、价格、库存、爬取时间),方便做价格趋势分析。
(二)Airflow调度模块:重点中的重点,依赖调度这么玩
1. 先搭Airflow环境
环境用Docker搭最快,避免本地依赖冲突,简单说下步骤:
- 拉取官方Docker Compose文件:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'; - 改配置:在
docker-compose.yaml里把AIRFLOW__CORE__DEFAULT_TIMEZONE改成Asia/Shanghai(时区坑后面会讲),把Executor改成CeleryExecutor,Broker用Redis; - 启动:
docker-compose up -d,访问http://localhost:8080就能看到Airflow的Web界面了。
2. 写第一个DAG:定时爬取品类列表
DAG文件存放在dags/目录下,Airflow会自动扫描。先写个简单的,每小时爬取一次品类列表,拿到最新的商品ID:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import sys
# 把爬虫脚本的目录加进来,方便导入
sys.path.append('/opt/airflow/dags/spiders')
from category_spider import run_category_spider
# 默认参数:重试3次,每次间隔5分钟,失败发邮件
default_args = {
'owner': 'laozhou',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['laozhou@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# 定义DAG:每小时0分执行,ID是category_spider_dag
with DAG(
'category_spider_dag',
default_args=default_args,
description='每小时爬取电商品类列表,更新商品ID',
schedule_interval='0 * * * *', # cron表达式:每小时0分
catchup=False, # 不补跑历史任务
tags=['price_monitor', 'category'],
) as dag:
# 定义任务:调用爬虫脚本的run函数
crawl_category = PythonOperator(
task_id='crawl_category_task',
python_callable=run_category_spider,
)
# 这里只有一个任务,直接写就行
crawl_category
3. 带依赖的DAG:先爬列表,再爬详情,最后更新价格
重点来了,怎么定义任务依赖?比如我们的流程是:
- 爬取品类列表(拿到商品ID)→ 2. 并发爬取商品详情页(分3个并发任务,避免压力太大)→ 3. 检查价格是否异常(比如波动超过20%发告警)→ 4. 写入MySQL。
用Airflow的>>运算符就能定义依赖,并发任务用列表包起来:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/opt/airflow/dags/spiders')
from category_spider import run_category_spider
from detail_spider import run_detail_spider
from price_check import check_price_abnormal
from data_writer import write_to_mysql
default_args = {
'owner': 'laozhou',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['laozhou@example.com'],
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
with DAG(
'price_monitor_dag',
default_args=default_args,
description='电商价格监控全流程:列表→详情→检查→写入',
schedule_interval='30 * * * *', # 每小时30分执行,等列表DAG跑完
catchup=False,
tags=['price_monitor', 'full_flow'],
) as dag:
# 1. 开始节点(DummyOperator,占位用)
start = DummyOperator(task_id='start')
# 2. 爬取品类列表
crawl_category = PythonOperator(
task_id='crawl_category_task',
python_callable=run_category_spider,
)
# 3. 并发爬取详情页:分3个任务,每个任务爬1/3的商品
crawl_detail_1 = PythonOperator(
task_id='crawl_detail_task_1',
python_callable=run_detail_spider,
op_kwargs={'part': 1, 'total_parts': 3}, # 传参数:第1部分,共3部分
)
crawl_detail_2 = PythonOperator(
task_id='crawl_detail_task_2',
python_callable=run_detail_spider,
op_kwargs={'part': 2, 'total_parts': 3},
)
crawl_detail_3 = PythonOperator(
task_id='crawl_detail_task_3',
python_callable=run_detail_spider,
op_kwargs={'part': 3, 'total_parts': 3},
)
# 4. 检查价格异常
check_price = PythonOperator(
task_id='check_price_abnormal_task',
python_callable=check_price_abnormal,
)
# 5. 写入MySQL
write_data = PythonOperator(
task_id='write_to_mysql_task',
python_callable=write_to_mysql,
)
# 6. 结束节点
end = DummyOperator(task_id='end')
# 定义依赖关系:
# start → 爬列表 → [并发爬详情1/2/3] → 检查价格 → 写入 → end
start >> crawl_category >> [crawl_detail_1, crawl_detail_2, crawl_detail_3] >> check_price >> write_data >> end
这样在Airflow的Web界面上,就能看到清晰的任务依赖图,哪个任务成功、哪个失败一目了然。
4. 跨DAG依赖:等列表DAG跑完再跑全流程
刚才的全流程DAG里,我们直接把爬列表的任务放进去了,但如果列表DAG是独立的(比如其他DAG也需要用),可以用ExternalTaskSensor做跨DAG依赖:
from airflow.sensors.external_task import ExternalTaskSensor
# 在price_monitor_dag里加一个Sensor,等category_spider_dag的crawl_category_task跑完
wait_for_category = ExternalTaskSensor(
task_id='wait_for_category_dag',
external_dag_id='category_spider_dag', # 要等的DAG ID
external_task_id='crawl_category_task', # 要等的任务ID
timeout=3600, # 最多等1小时
mode='poke',
poke_interval=60, # 每60秒检查一次
)
# 依赖改成:start → wait_for_category → 爬详情...
start >> wait_for_category >> [crawl_detail_1, crawl_detail_2, crawl_detail_3] >> check_price >> write_data >> end
四、踩坑实录!这些坑我都替你踩过了
Airflow看着简单,真要落地踩的坑不少,挑几个最常见的讲:
1. 时区坑:定时任务总是晚8小时
Airflow默认用UTC时间,我们在东八区,所以定时任务总是晚8小时。解决方法:
- 在
docker-compose.yaml里加环境变量:AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai; - DAG里的
start_date要用本地时间,比如datetime(2024, 1, 1, tzinfo=pendulum.timezone('Asia/Shanghai')); - 别用
datetime.now(),要用Airflow的{{ ds }}或{{ execution_date }}模板变量。
2. Executor坑:SequentialExecutor只能跑一个任务
一开始图省事用了SequentialExecutor,结果并发任务根本跑不起来,只能一个一个排队。解决方法:
- 换成CeleryExecutor,用Redis做Broker(小团队足够,不用搞RabbitMQ);
- 在
docker-compose.yaml里把AIRFLOW__CORE__EXECUTOR改成CeleryExecutor,AIRFLOW__CELERY__BROKER_URL改成redis://redis:6379/0。
3. 数据库连接坑:爬虫任务频繁连MySQL导致连接超时
爬虫任务多了,频繁创建和关闭MySQL连接,导致连接池不够用,报错“Lost connection to MySQL server”。解决方法:
- 用SQLAlchemy的连接池,配置
pool_size=10、max_overflow=20; - 不要在任务里频繁创建连接,用单例模式管理连接池;
- 任务结束后记得关闭连接(或者用上下文管理器)。
4. 日志坑:Airflow日志占满磁盘
Airflow默认把所有任务的日志都存在本地,时间长了能占几十G。解决方法:
- 写一个清理日志的DAG,定期删除7天前的日志;
- 或者把日志存到S3/OSS上,本地只保留最近3天的。
5. 任务依赖坑:catchup=True导致补跑一堆历史任务
一开始没注意catchup参数,默认是True,结果DAG启动后,把从start_date到现在的所有任务都补跑了一遍,服务器直接卡了。解决方法:
- 所有DAG都加
catchup=False,不补跑历史任务; - 如果真的需要补跑,手动在Web界面触发单个任务。
五、总结与优化建议
这套架构跑了一年多,总结下来优势很明显:
- 稳定:任务失败自动重试,告警及时,再也不用盯着日志看;
- 清晰:Web界面能看到所有任务的状态和依赖,查问题方便;
- 可扩展:后来加了库存监控、评论监控,直接加新的DAG就行,不用改现有代码。
最后给大家几个优化建议:
- 小团队:用LocalExecutor+SQLite就行,不用搞Celery,简单够用;
- 中等规模:用CeleryExecutor+Redis+MySQL,并发能到几十个任务;
- 大规模:换成KubernetesExecutor,动态扩缩容,监控加Prometheus+Grafana;
- 数据可视化:用Grafana连MySQL做价格趋势图,运营人员能直接看,不用每次找我们查数据。
更多推荐


所有评论(0)