大家好,我是威哥,搞爬虫和数据平台也有六七年了。前两年帮朋友的电商公司做竞品价格监控,一开始图省事,直接用crontab跑了七八个Python脚本——刚开始监控十几个商品还行,后来品类扩展到五百多个,脚本之间还得有依赖:得先爬取品类列表页拿到最新商品ID,才能去爬详情页和价格,crontab根本管不了这些依赖,经常出现列表还没更新,详情页脚本就先跑了,报错报得我头大。更糟的是失败了没告警,等朋友发现竞品价格没更新,都过了大半天,错过了调价窗口。

痛定思痛,决定换个正经的调度工具,对比了Azkaban、Oozie,最后选了Airflow——主要是看中它用Python写DAG,对我们爬虫党友好,可视化界面也清晰。折腾了两周踩了无数坑,终于搭了个稳定的平台,现在监控近千个商品,跑了一年多没出过大问题。今天就把这套架构从0到1的实现过程分享给大家,全是实战踩出来的干货。


一、为什么放弃crontab选Airflow?先聊聊我的痛点

先说说crontab的局限,相信很多做过定时爬虫的朋友都有共鸣:

  1. 依赖调度完全靠猜:脚本A要等脚本B跑完才能跑,只能靠估算时间差,比如脚本B设成0点,脚本A设成0点10分——但脚本B有时候跑5分钟,有时候跑20分钟,根本不靠谱;
  2. 失败了没人知道:crontab跑脚本失败了,除非你去看服务器日志,不然根本发现不了,等发现的时候数据已经断了好几天;
  3. 日志散在各处:每个脚本自己写日志文件,查问题的时候要翻好几个目录,效率极低;
  4. 任务多了管理乱:几十个crontab任务堆在那里,想暂停某个任务、改个时间都得小心翼翼,生怕删错了。

再说说Airflow解决了这些问题的核心优势:

  • 可视化DAG依赖:用Python代码定义任务依赖,谁先谁后一目了然,Web界面上还能看到任务的执行顺序和状态;
  • 任务重试与告警:任务失败了自动重试,还能发邮件、钉钉告警,不用再盯着日志看;
  • 集中式日志管理:所有任务的日志都存在Airflow里,Web界面直接就能查,不用登服务器;
  • 灵活的调度规则:不仅能定时间(比如每小时、每天),还能定依赖(比如等另一个DAG跑完再跑),甚至能根据数据是否存在来触发任务。

二、整体架构设计:四个核心模块,小团队也能落地

这套架构没搞太复杂,适合小团队快速落地,核心分为四个模块:

1. 架构总览

  • 爬虫层:负责爬取电商平台的商品信息、价格、库存,用Python+requests/Playwright实现,搭配代理池处理反爬;
  • 存储层:MySQL存商品基础信息和价格历史数据,Redis做缓存和去重(比如判断商品价格是否变化,避免重复写入);
  • 调度层:Airflow负责定时触发爬虫任务、管理任务依赖、监控任务状态,用CeleryExecutor做并发调度;
  • 告警层:任务失败或价格异常波动时,用钉钉/邮件发通知给运营人员。

2. 为什么选这些技术?

  • 爬虫:requests处理简单接口,Playwright处理需要JS渲染的页面,代理池用之前写的开源方案,稳定够用;
  • 存储:MySQL存结构化数据方便查询和做价格趋势分析,Redis做缓存快,去重也方便(用set存已爬的商品ID);
  • 调度:Airflow的Python DAG对我们太友好了,不用学新的配置语言,CeleryExecutor解决并发问题,小团队用Redis做Broker足够;
  • 告警:钉钉机器人配置简单,运营人员也习惯看钉钉,邮件作为兜底。

三、核心模块实现:从爬虫到Airflow调度,一步步讲

(一)爬虫模块:先把数据爬下来,处理好反爬

爬虫不是今天的重点,但还是简单说下核心逻辑,毕竟调度的前提是有稳定的爬虫:

  1. 反爬处理:用代理池(每3个请求换一个IP)、Playwright模拟真实浏览器(隐藏自动化特征)、随机请求头,基本能搞定大部分电商的反爬;
  2. 数据清洗:爬下来的价格是字符串(比如“¥199.00”),转成float类型,商品名称去掉多余空格,库存转成整数;
  3. 去重逻辑:Redis里存“商品ID-最新价格”的键值对,每次爬取先对比Redis里的价格,如果没变就不写MySQL,只更新Redis的时间戳,减少数据库压力;
  4. MySQL表结构设计
    • product_info表:存商品基础信息(商品ID、名称、URL、品类、更新时间);
    • price_history表:存价格历史(ID、商品ID、价格、库存、爬取时间),方便做价格趋势分析。

(二)Airflow调度模块:重点中的重点,依赖调度这么玩

1. 先搭Airflow环境

环境用Docker搭最快,避免本地依赖冲突,简单说下步骤:

  • 拉取官方Docker Compose文件:curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'
  • 改配置:在docker-compose.yaml里把AIRFLOW__CORE__DEFAULT_TIMEZONE改成Asia/Shanghai(时区坑后面会讲),把Executor改成CeleryExecutor,Broker用Redis;
  • 启动:docker-compose up -d,访问http://localhost:8080就能看到Airflow的Web界面了。
2. 写第一个DAG:定时爬取品类列表

DAG文件存放在dags/目录下,Airflow会自动扫描。先写个简单的,每小时爬取一次品类列表,拿到最新的商品ID:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import sys
# 把爬虫脚本的目录加进来,方便导入
sys.path.append('/opt/airflow/dags/spiders')
from category_spider import run_category_spider

# 默认参数:重试3次,每次间隔5分钟,失败发邮件
default_args = {
    'owner': 'laozhou',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['laozhou@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# 定义DAG:每小时0分执行,ID是category_spider_dag
with DAG(
    'category_spider_dag',
    default_args=default_args,
    description='每小时爬取电商品类列表,更新商品ID',
    schedule_interval='0 * * * *',  #  cron表达式:每小时0分
    catchup=False,  # 不补跑历史任务
    tags=['price_monitor', 'category'],
) as dag:

    # 定义任务:调用爬虫脚本的run函数
    crawl_category = PythonOperator(
        task_id='crawl_category_task',
        python_callable=run_category_spider,
    )

    # 这里只有一个任务,直接写就行
    crawl_category
3. 带依赖的DAG:先爬列表,再爬详情,最后更新价格

重点来了,怎么定义任务依赖?比如我们的流程是:

  1. 爬取品类列表(拿到商品ID)→ 2. 并发爬取商品详情页(分3个并发任务,避免压力太大)→ 3. 检查价格是否异常(比如波动超过20%发告警)→ 4. 写入MySQL。

用Airflow的>>运算符就能定义依赖,并发任务用列表包起来:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/opt/airflow/dags/spiders')
from category_spider import run_category_spider
from detail_spider import run_detail_spider
from price_check import check_price_abnormal
from data_writer import write_to_mysql

default_args = {
    'owner': 'laozhou',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['laozhou@example.com'],
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

with DAG(
    'price_monitor_dag',
    default_args=default_args,
    description='电商价格监控全流程:列表→详情→检查→写入',
    schedule_interval='30 * * * *',  # 每小时30分执行,等列表DAG跑完
    catchup=False,
    tags=['price_monitor', 'full_flow'],
) as dag:

    # 1. 开始节点(DummyOperator,占位用)
    start = DummyOperator(task_id='start')

    # 2. 爬取品类列表
    crawl_category = PythonOperator(
        task_id='crawl_category_task',
        python_callable=run_category_spider,
    )

    # 3. 并发爬取详情页:分3个任务,每个任务爬1/3的商品
    crawl_detail_1 = PythonOperator(
        task_id='crawl_detail_task_1',
        python_callable=run_detail_spider,
        op_kwargs={'part': 1, 'total_parts': 3},  # 传参数:第1部分,共3部分
    )
    crawl_detail_2 = PythonOperator(
        task_id='crawl_detail_task_2',
        python_callable=run_detail_spider,
        op_kwargs={'part': 2, 'total_parts': 3},
    )
    crawl_detail_3 = PythonOperator(
        task_id='crawl_detail_task_3',
        python_callable=run_detail_spider,
        op_kwargs={'part': 3, 'total_parts': 3},
    )

    # 4. 检查价格异常
    check_price = PythonOperator(
        task_id='check_price_abnormal_task',
        python_callable=check_price_abnormal,
    )

    # 5. 写入MySQL
    write_data = PythonOperator(
        task_id='write_to_mysql_task',
        python_callable=write_to_mysql,
    )

    # 6. 结束节点
    end = DummyOperator(task_id='end')

    # 定义依赖关系:
    # start → 爬列表 → [并发爬详情1/2/3] → 检查价格 → 写入 → end
    start >> crawl_category >> [crawl_detail_1, crawl_detail_2, crawl_detail_3] >> check_price >> write_data >> end

这样在Airflow的Web界面上,就能看到清晰的任务依赖图,哪个任务成功、哪个失败一目了然。

4. 跨DAG依赖:等列表DAG跑完再跑全流程

刚才的全流程DAG里,我们直接把爬列表的任务放进去了,但如果列表DAG是独立的(比如其他DAG也需要用),可以用ExternalTaskSensor做跨DAG依赖:

from airflow.sensors.external_task import ExternalTaskSensor

# 在price_monitor_dag里加一个Sensor,等category_spider_dag的crawl_category_task跑完
wait_for_category = ExternalTaskSensor(
    task_id='wait_for_category_dag',
    external_dag_id='category_spider_dag',  # 要等的DAG ID
    external_task_id='crawl_category_task',  # 要等的任务ID
    timeout=3600,  # 最多等1小时
    mode='poke',
    poke_interval=60,  # 每60秒检查一次
)

# 依赖改成:start → wait_for_category → 爬详情...
start >> wait_for_category >> [crawl_detail_1, crawl_detail_2, crawl_detail_3] >> check_price >> write_data >> end

四、踩坑实录!这些坑我都替你踩过了

Airflow看着简单,真要落地踩的坑不少,挑几个最常见的讲:

1. 时区坑:定时任务总是晚8小时

Airflow默认用UTC时间,我们在东八区,所以定时任务总是晚8小时。解决方法:

  • docker-compose.yaml里加环境变量:AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai
  • DAG里的start_date要用本地时间,比如datetime(2024, 1, 1, tzinfo=pendulum.timezone('Asia/Shanghai'))
  • 别用datetime.now(),要用Airflow的{{ ds }}{{ execution_date }}模板变量。

2. Executor坑:SequentialExecutor只能跑一个任务

一开始图省事用了SequentialExecutor,结果并发任务根本跑不起来,只能一个一个排队。解决方法:

  • 换成CeleryExecutor,用Redis做Broker(小团队足够,不用搞RabbitMQ);
  • docker-compose.yaml里把AIRFLOW__CORE__EXECUTOR改成CeleryExecutorAIRFLOW__CELERY__BROKER_URL改成redis://redis:6379/0

3. 数据库连接坑:爬虫任务频繁连MySQL导致连接超时

爬虫任务多了,频繁创建和关闭MySQL连接,导致连接池不够用,报错“Lost connection to MySQL server”。解决方法:

  • 用SQLAlchemy的连接池,配置pool_size=10max_overflow=20
  • 不要在任务里频繁创建连接,用单例模式管理连接池;
  • 任务结束后记得关闭连接(或者用上下文管理器)。

4. 日志坑:Airflow日志占满磁盘

Airflow默认把所有任务的日志都存在本地,时间长了能占几十G。解决方法:

  • 写一个清理日志的DAG,定期删除7天前的日志;
  • 或者把日志存到S3/OSS上,本地只保留最近3天的。

5. 任务依赖坑:catchup=True导致补跑一堆历史任务

一开始没注意catchup参数,默认是True,结果DAG启动后,把从start_date到现在的所有任务都补跑了一遍,服务器直接卡了。解决方法:

  • 所有DAG都加catchup=False,不补跑历史任务;
  • 如果真的需要补跑,手动在Web界面触发单个任务。

五、总结与优化建议

这套架构跑了一年多,总结下来优势很明显:

  • 稳定:任务失败自动重试,告警及时,再也不用盯着日志看;
  • 清晰:Web界面能看到所有任务的状态和依赖,查问题方便;
  • 可扩展:后来加了库存监控、评论监控,直接加新的DAG就行,不用改现有代码。

最后给大家几个优化建议:

  • 小团队:用LocalExecutor+SQLite就行,不用搞Celery,简单够用;
  • 中等规模:用CeleryExecutor+Redis+MySQL,并发能到几十个任务;
  • 大规模:换成KubernetesExecutor,动态扩缩容,监控加Prometheus+Grafana;
  • 数据可视化:用Grafana连MySQL做价格趋势图,运营人员能直接看,不用每次找我们查数据。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐