Python爬虫实战:基于DAG的任务编排系统从零实现(Airflow思想本地化)!
我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
摘要(Abstract)
当你的爬虫系统逐渐复杂——需要先登录、再搜索、然后批量抓详情、最后数据清洗入库——这时你会发现简单的顺序执行已经无法满足需求。本文将带你实现一个轻量级的DAG任务编排引擎,借鉴Airflow/Prefect的核心思想,但无需安装笨重的依赖。我们将以一个多阶段招聘数据爬虫为实战案例,展示如何优雅地管理复杂的数据流水线。读完本文你将掌握:
- ✅ DAG(有向无环图)的核心概念及其在爬虫中的应用场景
- ✅ 任务依赖关系的声明式定义与自动化调度执行
- ✅ 从零实现一个支持并行、重试、状态持久化的任务编排框架
- ✅ 生产环境的最佳实践:任务监控、失败恢复、增量更新策略
1️⃣ 背景与需求(Why)
为什么需要任务编排?
去年我负责的一个项目需要每天从5个不同招聘网站抓取数据,处理流程大概是这样的:
1. 登录获取Cookie(部分网站需要)
2. 搜索职位列表(按城市、关键词)
3. 抓取详情页(可能数千个URL)
4. 数据清洗(去重、格式化)
5. 入库(MySQL)
6. 生成日报(统计今日新增、薪资分布)
7. 发送邮件通知
最初我写了一个大函数按顺序执行,结果遇到各种问题:
- 🐛 脆弱性:第3步抓详情时偶尔会因为网络波动失败,导致整个流程重跑(前面的登录、搜索都白做了)
- 🐌 低效率:第3步其实可以并行抓取1000个URL,但顺序执行要跑2小时
- 🔍 难维护:想在第4步和第5步之间插入一个"敏感词过滤"环节,得改动很多地方
- 📊 缺乏可见性:不知道每个步骤耗时多少、哪个环节经常失败
这时我意识到需要一个任务编排系统——把爬虫拆成独立的Task,用DAG描述它们的依赖关系,让调度器自动处理执行顺序、并发、重试。
业务目标与编排需求
业务目标:
- 每日自动化抓取 5个招聘网站 的数据
- 总数据量:10万+ 职位/天
- 流程耗时:从 3小时 优化到 30分钟
- 失败恢复:单个Task失败不影响其他Task
任务依赖图(DAG结构):
┌─────────────┐
│ 登录任务 │ (Task A)
└──────┬──────┘
│
┌────────┴────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ 搜索城市1 │ │ 搜索城市2 │ (Task B1, B2)
└─────┬────┘ └─────┬────┘
│ │
└────────┬────────┘
▼
┌─────────────────┐
│ 批量抓取详情 │ (Task C - 并行)
└────────┬─────────┘
▼
┌─────────────────┐
│ 数据清洗 │ (Task D)
└────────┬─────────┘
▼
┌─────────────────┐
│ 入库存储 │ (Task E)
└────────┬─────────┘
▼
┌─────────────────┐
│ 生成报告+通知 │ (Task F)
└─────────────────┘
监控目标字段:
DAG执行记录:
├── dag_id # DAG唯一标识
├── execution_date # 执行时间
├── state # 状态(running/success/failed)
├── start_time # 开始时间
└── end_time # 结束时间
Task执行记录:
├── task_id # 任务ID
├── dag_id # 所属DAG
├── state # 状态(pending/running/success/failed/retry)
├── try_number # 重试次数
├── start_time # 开始时间
├── end_time # 结束时间
├── duration # 耗时(秒)
└── log_url # 日志路径
2️⃣ 合规与注意事项
频率控制增强
在任务编排系统中,并发能力增强意味着更高的反爬风险。必须在DAG层面设置全局限流:
# 全局并发限制
MAX_ACTIVE_TASKS_PER_DAG = 10 # 单个DAG最多10个Task同时运行
MAX_REQUESTS_PER_SECOND = 5 # 全局QPS限制
# Task级别延迟(在Task函数内部)
import time
time.sleep(random.uniform(2, 5)) # 每个请求延迟2-5秒
数据采集边界重申
即使系统能力再强,也要坚守底线:
- ❌ 不绕过登录墙:即使技术上可以破解,也不要做
- ❌ 不暴力重试:失败后要指数退避,不要每秒重试
- ✅ 尊重robots.txt:在DAG的第一个Task检查,不通过则整个流程终止
3️⃣ 技术选型与整体流程(What/How)
Airflow vs Prefect vs 自研方案
| 特性 | Airflow | Prefect | 本文方案 |
|---|---|---|---|
| 部署复杂度 | 高(需要数据库、Web服务器) | 中(需要Prefect Server) | 低(单Python文件) |
| 学习曲线 | 陡峭 | 适中 | 平缓 |
| 功能完整性 | 非常完整 | 完整 | 基础但够用 |
| 适用场景 | 企业级、复杂DAG | 中型团队 | 个人/小团队 |
| 依赖包大小 | 200+ MB | 100+ MB | < 5 MB |
为什么自研?
- 对于日抓取量 < 100万的场景,Airflow就像用大炮打蚊子
- Prefect虽然轻量,但仍需要额外的Server组件
- 自研方案可以精确控制每个细节,便于定制化
核心架构设计:
┌──────────────────────────────────────────────┐
│ DAG Definition Layer │
│ - 声明式定义任务(@task装饰器) │
│ - 声明依赖关系(>>操作符) │
└────────────┬─────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ Scheduler Layer (调度器) │
│ - 拓扑排序(确定执行顺序) │
│ - 任务队列(待执行、执行中、已完成) │
│ - 并发控制(线程池/进程池) │
└────────────┬─────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ Executor Layer (执行器) │
│ - Task运行时环境 │
│ - 状态管理(pending/running/success/failed) │
│ - 重试逻辑(指数退避) │
│ - 上下文传递(XCom机制) │
└────────────┬─────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ Persistence Layer (持久化) │
│ - SQLite存储执行历史 │
│ - 状态快照(支持断点续跑) │
│ - 日志归档 │
└──────────────────────────────────────────────┘
4️⃣ 环境准备与依赖安装
Python版本要求
# 推荐 Python 3.9+(用到了类型注解新特性)
python --version # Python 3.9.18
依赖安装
# 核心依赖(极简)
pip install networkx==3.2.1 # 用于DAG拓扑排序
pip install sqlalchemy==2.0.23 # ORM持久化
pip install pydantic==2.5.0 # 数据验证
pip install rich==13.7.0 # 终端美化输出
# 如果需要可视化DAG
pip install matplotlib==3.8.2
pip install graphviz==0.20.1
项目目录结构
dag_scheduler/
├── config/
│ ├── __init__.py
│ └── settings.py # 配置文件
├── core/
│ ├── __init__.py
│ ├── dag.py # DAG核心类
│ ├── task.py # Task基类
│ ├── scheduler.py # 调度器
│ ├── executor.py # 执行器
│ └── state.py # 状态枚举
├── storage/
│ ├── __init__.py
│ ├── models.py # SQLAlchemy模型
│ └── repository.py # 数据访问层
├── utils/
│ ├── __init__.py
│ ├── logger.py # 日志配置
│ └── visualizer.py # DAG可视化
├── dags/ # 用户定义的DAG
│ ├── job_spider_dag.py # 示例DAG
│ └── ...
├── logs/ # 日志目录
├── data/ # 数据库文件
├── main.py # 入口文件
└── requirements.txt
5️⃣ 核心实现:Task基类与装饰器
Task状态枚举
# core/state.py
from enum import Enum
class TaskState(str, Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
SUCCESS = "success" # 成功
FAILED = "failed" # 失败
RETRY = "retry" # 重试中
SKIPPED = "skipped" # 跳过(上游失败)
def is_finished(self) -> bool:
"""判断是否为终态"""
return self in {TaskState.SUCCESS, TaskState.FAILED, TaskState.SKIPPED}
def is_successful(self) -> bool:
"""判断是否成功"""
return self == TaskState.SUCCESS
class DAGState(str, Enum):
"""DAG状态枚举"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
Task基类设计
# core/task.py
from typing import Callable, Any, List, Optional, Dict
from datetime import datetime
import time
import traceback
from core.state import TaskState
from utils.logger import get_logger
logger = get_logger(__name__)
class Task:
"""
任务基类
核心职责:
1. 封装用户定义的业务逻辑(python_callable)
2. 管理任务的执行状态(pending -> running -> success/failed)
3. 处理重试逻辑(指数退避)
4. 上下文数据传递(XCom机制)
"""
def __init__(
self,
task_id: str,
python_callable: Callable,
retries: int = 0,
retry_delay: float = 60.0,
timeout: Optional[float] = None,
depends_on: Optional[List['Task']] = None,
dag: Optional['DAG'] = None,
):
"""
初始化任务
Args:
task_id: 任务唯一标识(在同一个DAG内唯一)
python_callable: 要执行的Python函数
retries: 失败后重试次数(默认0次)
retry_delay: 重试间隔(秒,每次重试会指数增长)
timeout: 超时时间(秒,None表示不限制)
depends_on: 上游依赖任务列表
dag: 所属DAG对象
"""
self.task_id = task_id
self.python_callable = python_callable
self.retries = retries
self.retry_delay = retry_delay
self.timeout = timeout
self.dag = dag
# 依赖关系管理
self.upstream_tasks: List['Task'] = depends_on or [] # 上游任务
self.downstream_tasks: List['Task'] = [] # 下游任务
# 运行时状态
self.state = TaskState.PENDING
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.try_number = 0 # 当前是第几次尝试
self.exception: Optional[Exception] = None
# XCom:任务间数据传递(简化版,内存存储)
self.output: Any = None
def set_upstream(self, task: 'Task'):
"""
设置上游依赖
示例:task_b.set_upstream(task_a) # task_b依赖task_a
"""
if task not in self.upstream_tasks:
self.upstream_tasks.append(task)
if self not in task.downstream_tasks:
task.downstream_tasks.append(self)
def set_downstream(self, task: 'Task'):
"""
设置下游任务
示例:task_a.set_downstream(task_b) # task_a完成后执行task_b
"""
task.set_upstream(self)
def __rshift__(self, other: 'Task') -> 'Task':
"""
运算符重载:>>
允许链式调用:task_a >> task_b >> task_c
等价于:task_a.set_downstream(task_b); task_b.set_downstream(task_c)
"""
self.set_downstream(other)
return other
def __lshift__(self, other: 'Task') -> 'Task':
"""
运算符重载:<<
task_b << task_a 等价于 task_a >> task_b
"""
self.set_upstream(other)
return other
def execute(self, context: Dict[str, Any]) -> Any:
"""
执行任务的核心方法
Args:
context: 执行上下文(包含dag_run信息、execution_date等)
Returns:
任务执行结果(可以被下游任务通过XCom获取)
Raises:
Exception: 执行失败时抛出
"""
self.state = TaskState.RUNNING
self.start_time = datetime.now()
self.try_number += 1
logger.info(f"🚀 开始执行任务: {self.task_id} (第{self.try_number}次尝试)")
try:
# 调用用户定义的函数
# 注入上下文参数(如果函数签名包含context参数)
import inspect
sig = inspect.signature(self.python_callable)
if 'context' in sig.parameters:
result = self.python_callable(context=context)
else:
result = self.python_callable()
# 执行成功
self.state = TaskState.SUCCESS
self.output = result
self.end_time = datetime.now()
duration = (self.end_time - self.start_time).total_seconds()
logger.info(f"✅ 任务成功: {self.task_id} (耗时: {duration:.2f}秒)")
return result
except Exception as e:
self.exception = e
# 判断是否需要重试
if self.try_number <= self.retries:
self.state = TaskState.RETRY
wait_time = self.retry_delay * (2 ** (self.try_number - 1)) # 指数退避
logger.warning(
f"⚠️ 任务失败,将在{wait_time:.0f}秒后重试: {self.task_id}\n"
f" 错误: {str(e)}"
)
time.sleep(wait_time)
# 递归重试
return self.execute(context)
else:
# 重试次数耗尽,标记为失败
self.state = TaskState.FAILED
self.end_time = datetime.now()
logger.error(
f"❌ 任务失败(已重试{self.retries}次): {self.task_id}\n"
f" 错误: {str(e)}\n"
f" 堆栈:\n{traceback.format_exc()}"
)
raise
def get_upstream_output(self, task_id: str) -> Any:
"""
获取上游任务的输出(XCom机制)
Args:
task_id: 上游任务ID
Returns:
上游任务的输出结果
示例:
def process_data(context):
# 获取上游任务"fetch_data"的输出
data = context['task'].get_upstream_output('fetch_data')
return clean(data)
"""
for task in self.upstream_tasks:
if task.task_id == task_id:
return task.output
raise ValueError(f"上游任务 {task_id} 不存在")
def __repr__(self) -> str:
return f"<Task: {self.task_id} [{self.state.value}]>"
关键设计点解析:
-
状态机管理:
PENDING → RUNNING → SUCCESS/FAILED- 失败后可能进入
RETRY状态,最终仍是SUCCESS/FAILED
-
依赖关系:
upstream_tasks:必须先执行完成的任务downstream_tasks:等待当前任务完成才能执行>>操作符:让DAG定义更直观
-
重试机制:
- 指数退避:第1次等60秒,第2次等120秒,第3次等240秒…
- 避免"雪崩效应"(连续失败导致系统过载)
-
XCom(跨任务通信):
- 简化版,直接存储在
output属性 - 生产环境应持久化到数据库
- 简化版,直接存储在
6️⃣ 核心实现:DAG类与拓扑排序
DAG核心类
# core/dag.py
from typing import List, Dict, Optional, Set
from datetime import datetime, timedelta
import networkx as nx
from core.task import Task
from core.state import DAGState, TaskState
from utils.logger import get_logger
logger = get_logger(__name__)
class DAG:
"""
有向无环图(Directed Acyclic Graph)
核心职责:
1. 管理一组相关的Task
2. 验证DAG合法性(是否有环)
3. 计算任务执行顺序(拓扑排序)
4. 调度任务执行
"""
def __init__(
self,
dag_id: str,
description: str = "",
schedule_interval: Optional[timedelta] = None,
start_date: Optional[datetime] = None,
default_args: Optional[Dict] = None,
):
"""
初始化DAG
Args:
dag_id: DAG唯一标识
description: DAG描述
schedule_interval: 调度间隔(None表示手动触发)
start_date: 开始日期
default_args: 默认参数(会传递给所有Task)
"""
self.dag_id = dag_id
self.description = description
self.schedule_interval = schedule_interval
self.start_date = start_date or datetime.now()
self.default_args = default_args or {}
# 任务容器
self.tasks: Dict[str, Task] = {}
# 运行时状态
self.state = DAGState.PENDING
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
def add_task(self, task: Task):
"""
添加任务到DAG
Args:
task: Task对象
"""
if task.task_id in self.tasks:
raise ValueError(f"任务ID重复: {task.task_id}")
task.dag = self # 反向引用
self.tasks[task.task_id] = task
logger.debug(f"➕ 添加任务到DAG: {self.dag_id}.{task.task_id}")
def validate(self) -> bool:
"""
验证DAG合法性
检查项:
1. 是否有环(有向无环图的前提)
2. 是否有孤立节点(没有上下游的任务)
Returns:
True表示合法
Raises:
ValueError: DAG非法时抛出
"""
if not self.tasks:
raise ValueError(f"DAG {self.dag_id} 没有任务")
# 构建NetworkX图
G = nx.DiGraph()
for task in self.tasks.values():
G.add_node(task.task_id)
for upstream in task.upstream_tasks:
G.add_edge(upstream.task_id, task.task_id)
# 检查是否有环
if not nx.is_directed_acyclic_graph(G):
cycles = list(nx.simple_cycles(G))
raise ValueError(
f"DAG {self.dag_id} 包含环路!这会导致死锁。\n"
f"环路路径: {cycles}"
)
logger.info(f"✅ DAG验证通过: {self.dag_id}")
return True
def topological_sort(self) -> List[Task]:
"""
拓扑排序:计算任务执行顺序
算法:Kahn算法
1. 找到所有入度为0的节点(没有上游依赖)
2. 将它们加入结果列表
3. 移除这些节点及其出边
4. 重复1-3直到所有节点处理完
Returns:
按执行顺序排列的Task列表
示例:
A → B → D
A → C → D
结果可能是:[A, B, C, D] 或 [A, C, B, D]
"""
# 构建NetworkX图
G = nx.DiGraph()
for task in self.tasks.values():
G.add_node(task.task_id)
for upstream in task.upstream_tasks:
G.add_edge(upstream.task_id, task.task_id)
# 使用NetworkX的拓扑排序
try:
sorted_task_ids = list(nx.topological_sort(G))
except nx.NetworkXError:
raise ValueError(f"DAG {self.dag_id} 拓扑排序失败(可能有环)")
# 转换为Task对象列表
sorted_tasks = [self.tasks[task_id] for task_id in sorted_task_ids]
logger.debug(f"📊 拓扑排序结果: {[t.task_id for t in sorted_tasks]}")
return sorted_tasks
def get_runnable_tasks(self) -> List[Task]:
"""
获取当前可运行的任务
可运行条件:
1. 任务状态为PENDING
2. 所有上游任务都已成功完成
Returns:
可运行的Task列表
"""
runnable = []
for task in self.tasks.values():
# 检查任务状态
if task.state != TaskState.PENDING:
continue
# 检查上游依赖
all_upstream_success = all(
upstream.state == TaskState.SUCCESS
for upstream in task.upstream_tasks
)
if all_upstream_success:
runnable.append(task)
return runnable
def get_task(self, task_id: str) -> Optional[Task]:
"""根据ID获取任务"""
return self.tasks.get(task_id)
def __repr__(self) -> str:
return f"<DAG: {self.dag_id} ({len(self.tasks)} tasks)>"
拓扑排序详解:
假设有如下依赖关系:
Task A (登录)
├─→ Task B (搜索城市1)
│ └─→ Task D (抓取详情)
└─→ Task C (搜索城市2)
└─→ Task D (抓取详情)
Kahn算法执行过程:
# 初始状态
入度表 = {A: 0, B: 1, C: 1, D: 2}
队列 = [A] # 入度为0的节点
结果 = []
# 第1轮
出队: A
结果 = [A]
更新入度: B=0, C=0
队列 = [B, C]
# 第2轮
出队: B
结果 = [A, B]
更新入度: D=1
队列 = [C]
# 第3轮
出队: C
结果 = [A, B, C]
更新入度: D=0
队列 = [D]
# 第4轮
出队: D
结果 = [A, B, C, D]
队列 = []
# 最终顺序:A → B → C → D
注意:B和C可以并行执行(它们的入度同时变为0),调度器会利用这一点。
7️⃣ 核心实现:调度器与执行器
串行执行器(Sequential Executor)
# core/executor.py
from typing import Dict, Any
from datetime import datetime
from core.dag import DAG
from core.state import DAGState, TaskState
from utils.logger import get_logger
logger = get_logger(__name__)
class SequentialExecutor:
"""
串行执行器
按拓扑排序的顺序依次执行任务,不支持并行。
适用于调试或资源受限的场景。
"""
def execute_dag(self, dag: DAG, execution_date: datetime) -> Dict[str, Any]:
"""
执行DAG
Args:
dag: 要执行的DAG对象
execution_date: 逻辑执行日期(用于幂等性)
Returns:
执行结果字典
"""
logger.info(f"{'='*60}")
logger.info(f"🚀 开始执行DAG: {dag.dag_id}")
logger.info(f"📅 执行日期: {execution_date}")
logger.info(f"{'='*60}\n")
# 验证DAG
dag.validate()
# 拓扑排序
sorted_tasks = dag.topological_sort()
# 准备执行上下文
context = {
'dag': dag,
'execution_date': execution_date,
'dag_run_id': f"{dag.dag_id}_{execution_date.strftime('%Y%m%d_%H%M%S')}"
}
# 开始执行
dag.state = DAGState.RUNNING
dag.start_time = datetime.now()
failed_tasks = []
for task in sorted_tasks:
try:
# 注入task到context(让Task能访问DAG信息)
context['task'] = task
# 执行任务
task.execute(context)
except Exception as e:
# 任务失败,标记下游任务为SKIPPED
logger.error(f"💥 任务失败,跳过下游任务: {task.task_id}")
failed_tasks.append(task.task_id)
self._skip_downstream(task)
# 结束执行
dag.end_time = datetime.now()
duration = (dag.end_time - dag.start_time).total_seconds()
# 判断DAG整体状态
if failed_tasks:
dag.state = DAGState.FAILED
logger.error(f"\n❌ DAG执行失败: {dag.dag_id} (耗时: {duration:.2f}秒)")
logger.error(f" 失败任务: {', '.join(failed_tasks)}")
else:
dag.state = DAGState.SUCCESS
logger.info(f"\n✅ DAG执行成功: {dag.dag_id} (耗时: {duration:.2f}秒)")
# 返回执行摘要
return {
'dag_id': dag.dag_id,
'state': dag.state.value,
'duration': duration,
'total_tasks': len(sorted_tasks),
'success_tasks': sum(1 for t in dag.tasks.values() if t.state == TaskState.SUCCESS),
'failed_tasks': failed_tasks,
}
def _skip_downstream(self, task):
"""递归跳过所有下游任务"""
for downstream in task.downstream_tasks:
if downstream.state == TaskState.PENDING:
downstream.state = TaskState.SKIPPED
logger.warning(f"⏭️ 跳过任务: {downstream.task_id}(上游失败)")
self._skip_downstream(downstream)
并行执行器(Thread Pool Executor)
# core/executor.py (续)
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class ParallelExecutor:
"""
并行执行器
使用线程池并行执行可运行的任务,显著提升效率。
基于拓扑排序的层级并行:同一层的任务可并行,不同层串行。
"""
def __init__(self, max_workers: int = 5):
"""
初始化执行器
Args:
max_workers: 最大线程数
"""
self.max_workers = max_workers
self.lock = threading.Lock() # 保护共享状态
def execute_dag(self, dag: DAG, execution_date: datetime) -> Dict[str, Any]:
"""
并行执行DAG
算法:
1. 找到所有入度为0的任务(第一层)
2. 并行执行这些任务
3. 任务完成后,更新下游任务的入度
4. 找到新的入度为0的任务(下一层)
5. 重复2-4直到所有任务完成
"""
logger.info(f"{'='*60}")
logger.info(f"🚀 开始并行执行DAG: {dag.dag_id} (最大{self.max_workers}并发)")
logger.info(f"{'='*60}\n")
dag.validate()
context = {
'dag': dag,
'execution_date': execution_date,
'dag_run_id': f"{dag.dag_id}_{execution_date.strftime('%Y%m%d_%H%M%S')}"
}
dag.state = DAGState.RUNNING
dag.start_time = datetime.now()
# 线程池
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
while True:
# 获取可运行的任务
runnable_tasks = dag.get_runnable_tasks()
if not runnable_tasks:
# 没有可运行任务,检查是否所有任务都已完成
all_finished = all(
task.state.is_finished()
for task in dag.tasks.values()
)
if all_finished:
break # DAG执行完成
else:
# 可能有任务正在运行,等待一会儿
import time
time.sleep(0.5)
continue
logger.info(f"🔄 并行执行 {len(runnable_tasks)} 个任务...")
# 提交任务到线程池
futures = {}
for task in runnable_tasks:
task_context = context.copy()
task_context['task'] = task
future = executor.submit(task.execute, task_context)
futures[future] = task
# 等待当前批次完成
for future in as_completed(futures):
task = futures[future]
try:
future.result()
except Exception as e:
# 任务已在execute()中处理异常
with self.lock:
self._skip_downstream(task)
# 统计结果
dag.end_time = datetime.now()
duration = (dag.end_time - dag.start_time).total_seconds()
failed_tasks = [
t.task_id for t in dag.tasks.values()
if t.state == TaskState.FAILED
]
if failed_tasks:
dag.state = DAGState.FAILED
logger.error(f"\n❌ DAG执行失败: {dag.dag_id} (耗时: {duration:.2f}秒)")
else:
dag.state = DAGState.SUCCESS
logger.info(f"\n✅ DAG执行成功: {dag.dag_id} (耗时: {duration:.2f}秒)")
return {
'dag_id': dag.dag_id,
'state': dag.state.value,
'duration': duration,
'total_tasks': len(dag.tasks),
'success_tasks': sum(1 for t in dag.tasks.values() if t.state == TaskState.SUCCESS),
'failed_tasks': failed_tasks,
}
def _skip_downstream(self, task):
"""线程安全的下游跳过"""
for downstream in task.downstream_tasks:
with self.lock:
if downstream.state == TaskState.PENDING:
downstream.state = TaskState.SKIPPED
logger.warning(f"⏭️ 跳过任务: {downstream.task_id}")
self._skip_downstream(downstream)
并行执行示例:
时间轴视图(ParallelExecutor,max_workers=3):
t=0s : [A] 开始执行(登录)
t=5s : [A] 完成 ✅
[B1, B2, B3] 同时开始(搜索3个城市 - 并发)
t=10s : [B1] 完成 ✅
[C1] 开始(抓取B1的详情URL)
t=12s : [B2] 完成 ✅
[C2] 开始(抓取B2的详情URL)
t=15s : [B3] 完成 ✅, [C1] 完成 ✅
[C3] 开始
t=18s : [C2, C3] 完成 ✅
[D] 开始(数据清洗 - 汇总前面所有结果)
t=25s : [D] 完成 ✅
[E] 开始(入库)
t=30s : [E] 完成 ✅
[F] 开始(生成报告)
t=35s : [F] 完成 ✅
总耗时: 35秒 (串行执行需要70秒)
关键优化点:
- B1/B2/B3 并行执行(节省10秒)
- C1/C2/C3 并行执行(节省6秒)
- 总体提速 2倍
8️⃣ 数据持久化:执行历史存储
SQLAlchemy模型定义
# storage/models.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class DAGRun(Base):
"""DAG执行记录表"""
__tablename__ = 'dag_runs'
id = Column(Integer, primary_key=True, autoincrement=True)
dag_id = Column(String(255), nullable=False, index=True)
execution_date = Column(DateTime, nullable=False, index=True)
state = Column(String(50), nullable=False) # pending/running/success/failed
start_time = Column(DateTime)
end_time = Column(DateTime)
duration = Column(Float) # 秒
def __repr__(self):
return f"<DAGRun {self.dag_id} @ {self.execution_date}>"
class TaskInstance(Base):
"""Task执行实例表"""
__tablename__ = 'task_instances'
id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(String(255), nullable=False, index=True)
dag_id = Column(String(255), nullable=False, index=True)
execution_date = Column(DateTime, nullable=False, index=True)
state = Column(String(50), nullable=False)
try_number = Column(Integer, default=1)
start_time = Column(DateTime)
end_time = Column(DateTime)
duration = Column(Float)
log_path = Column(String(500)) # 日志文件路径
error_message = Column(Text) # 错误信息
def __repr__(self):
return f"<TaskInstance {self.dag_id}.{self.task_id} [{self.state}]>"
class XCom(Base):
"""任务间数据传递表(生产环境版本)"""
__tablename__ = 'xcom'
id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(String(255), nullable=False, index=True)
dag_id = Column(String(255), nullable=False)
execution_date = Column(DateTime, nullable=False)
key = Column(String(255), nullable=False, default='return_value')
value = Column(Text) # JSON序列化的数据
timestamp = Column(DateTime, default=datetime.now)
def __repr__(self):
return f"<XCom {self.dag_id}.{self.task_id}.{self.key}>"
Repository数据访问层
# storage/repository.py
from typing import List, Optional
from datetime import datetime
import json
from sqlalchemy.orm import Session
from storage.models import DAGRun, TaskInstance, XCom, Base
from sqlalchemy import create_engine
from utils.logger import get_logger
logger = get_logger(__name__)
class DAGRepository:
"""DAG执行历史数据仓库"""
def __init__(self, db_url: str = 'sqlite:///data/dag_scheduler.db'):
"""
初始化数据库连接
Args:
db_url: 数据库连接字符串
"""
self.engine = create_engine(db_url, echo=False)
Base.metadata.create_all(self.engine) # 创建表
self.SessionLocal = sessionmaker(bind=self.engine)
def save_dag_run(
self,
dag_id: str,
execution_date: datetime,
state: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
duration: Optional[float] = None
) -> int:
"""
保存DAG执行记录
Returns:
记录ID
"""
session = self.SessionLocal()
try:
dag_run = DAGRun(
dag_id=dag_id,
execution_date=execution_date,
state=state,
start_time=start_time,
end_time=end_time,
duration=duration
)
session.add(dag_run)
session.commit()
logger.debug(f"💾 保存DAG运行记录: {dag_id} @ {execution_date}")
return dag_run.id
except Exception as e:
session.rollback()
logger.error(f"保存DAG运行记录失败: {e}")
raise
finally:
session.close()
def save_task_instance(
self,
task_id: str,
dag_id: str,
execution_date: datetime,
state: str,
try_number: int = 1,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
duration: Optional[float] = None,
error_message: Optional[str] = None
) -> int:
"""保存Task执行实例"""
session = self.SessionLocal()
try:
task_instance = TaskInstance(
task_id=task_id,
dag_id=dag_id,
execution_date=execution_date,
state=state,
try_number=try_number,
start_time=start_time,
end_time=end_time,
duration=duration,
error_message=error_message
)
session.add(task_instance)
session.commit()
return task_instance.id
finally:
session.close()
def push_xcom(
self,
task_id: str,
dag_id: str,
execution_date: datetime,
key: str,
value: any
):
"""
推送XCom数据(任务间传递)
Args:
task_id: 任务ID
dag_id: DAG ID
execution_date: 执行日期
key: 数据键名
value: 数据值(会自动JSON序列化)
"""
session = self.SessionLocal()
try:
# JSON序列化
value_json = json.dumps(value, ensure_ascii=False, default=str)
xcom = XCom(
task_id=task_id,
dag_id=dag_id,
execution_date=execution_date,
key=key,
value=value_json
)
session.add(xcom)
session.commit()
logger.debug(f"📤 推送XCom: {dag_id}.{task_id}.{key}")
finally:
session.close()
def pull_xcom(
self,
task_id: str,
dag_id: str,
execution_date: datetime,
key: str = 'return_value'
) -> Optional[any]:
"""
拉取XCom数据
Returns:
反序列化后的数据,不存在则返回None
"""
session = self.SessionLocal()
try:
xcom = session.query(XCom).filter(
XCom.task_id == task_id,
XCom.dag_id == dag_id,
XCom.execution_date == execution_date,
XCom.key == key
).first()
if xcom:
logger.debug(f"📥 拉取XCom: {dag_id}.{task_id}.{key}")
return json.loads(xcom.value)
return None
finally:
session.close()
def get_dag_runs(
self,
dag_id: Optional[str] = None,
state: Optional[str] = None,
limit: int = 100
) -> List[DAGRun]:
"""
查询DAG执行历史
Args:
dag_id: 过滤特定DAG(None表示所有)
state: 过滤特定状态
limit: 返回记录数
"""
session = self.SessionLocal()
try:
query = session.query(DAGRun)
if dag_id:
query = query.filter(DAGRun.dag_id == dag_id)
if state:
query = query.filter(DAGRun.state == state)
return query.order_by(DAGRun.execution_date.desc()).limit(limit).all()
finally:
session.close()
def get_task_stats(self, dag_id: str) -> dict:
"""
获取Task统计信息
Returns:
{
'total_runs': 总执行次数,
'success_rate': 成功率,
'avg_duration': 平均耗时,
'failure_reasons': 失败原因统计
}
"""
session = self.SessionLocal()
try:
instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_id
).all()
if not instances:
return {}
total = len(instances)
success = sum(1 for i in instances if i.state == 'success')
durations = [i.duration for i in instances if i.duration]
avg_duration = sum(durations) / len(durations) if durations else 0
# 统计失败原因
failure_reasons = {}
for instance in instances:
if instance.state == 'failed' and instance.error_message:
# 提取错误类型(简化处理)
error_type = instance.error_message.split(':')[0]
failure_reasons[error_type] = failure_reasons.get(error_type, 0) + 1
return {
'total_runs': total,
'success_rate': success / total if total > 0 else 0,
'avg_duration': avg_duration,
'failure_reasons': failure_reasons
}
finally:
session.close()
9️⃣ 实战案例:多站点招聘数据爬虫DAG
业务函数定义
# dags/job_spider_dag.py
from datetime import datetime, timedelta
from core.dag import DAG
from core.task import Task
import requests
from bs4 import BeautifulSoup
import time
import random
# ========== 业务函数(每个Task的实际逻辑) ==========
def login_task(**context):
"""
任务A:模拟登录(获取Cookie)
返回值会存储在XCom中,供下游任务使用
"""
print("🔐 执行登录...")
# 模拟登录请求
session = requests.Session()
# login_data = {'username': 'xxx', 'password': 'xxx'}
# response = session.post('https://example.com/login', data=login_data)
# 模拟成功
time.sleep(2)
cookies = {'session_id': 'mock_session_123456'}
print(f"✅ 登录成功,Cookie: {cookies}")
return {'cookies': cookies, 'session': session}
def search_city_task(city_name: str):
"""
任务B:搜索指定城市的职位列表
Args:
city_name: 城市名称
这是一个高阶函数,返回实际的task函数
"""
def _search(**context):
print(f"🔍 搜索城市: {city_name}")
# 从上游任务获取Cookie
login_result = context['task'].get_upstream_output('login')
cookies = login_result['cookies']
# 模拟搜索请求
# url = f"https://example.com/search?city={city_name}&keyword=Python"
# response = requests.get(url, cookies=cookies)
# soup = BeautifulSoup(response.text, 'lxml')
# job_urls = [a['href'] for a in soup.select('.job-list a')]
# 模拟结果
time.sleep(random.uniform(1, 3))
job_urls = [
f"https://example.com/job/{i}_{city_name}"
for i in range(10) # 每个城市10个职位
]
print(f"✅ 找到 {len(job_urls)} 个职位URL")
return {'city': city_name, 'urls': job_urls}
return _search
def fetch_details_task(**context):
"""
任务C:批量抓取详情页
汇总所有上游搜索任务的URL,并发抓取
"""
print("📥 开始抓取详情页...")
# 获取所有搜索任务的输出
all_urls = []
for task_id in ['search_beijing', 'search_shanghai', 'search_shenzhen']:
try:
search_result = context['task'].get_upstream_output(task_id)
all_urls.extend(search_result['urls'])
except ValueError:
print(f"⚠️ 跳过任务 {task_id}(可能失败)")
print(f"📊 总共 {len(all_urls)} 个URL待抓取")
# 这里可以用线程池并发抓取
job_details = []
for url in all_urls[:5]: # 示例只抓5个
# response = requests.get(url)
# detail = parse_job_detail(response.text)
# 模拟
time.sleep(0.5)
job_details.append({
'url': url,
'title': f'Python工程师_{random.randint(1,100)}',
'salary': f'{random.randint(20,50)}k',
})
print(f"✅ 抓取完成,共 {len(job_details)} 条数据")
return job_details
def clean_data_task(**context):
"""
任务D:数据清洗
处理重复、格式化字段等
"""
print("🧹 开始数据清洗...")
raw_data = context['task'].get_upstream_output('fetch_details')
# 去重(基于URL)
seen_urls = set()
cleaned_data = []
for job in raw_data:
if job['url'] not in seen_urls:
seen_urls.add(job['url'])
# 格式化薪资(示例:25k -> 25000)
salary_str = job['salary'].replace('k', '000')
job['salary_normalized'] = int(salary_str.split('-')[0]) if '-' in salary_str else int(salary_str)
cleaned_data.append(job)
print(f"✅ 清洗完成,去重后 {len(cleaned_data)} 条")
return cleaned_data
def save_to_db_task(**context):
"""
任务E:入库
保存到SQLite/MySQL
"""
print("💾 开始入库...")
cleaned_data = context['task'].get_upstream_output('clean_data')
# 实际入库逻辑
# from storage import JobStorage
# storage = JobStorage()
# for job in cleaned_data:
# storage.save(job)
# 模拟
time.sleep(1)
print(f"✅ 已保存 {len(cleaned_data)} 条数据到数据库")
return {'saved_count': len(cleaned_data)}
def generate_report_task(**context):
"""
任务F:生成日报
统计今日新增、薪资分布等
"""
print("📊 生成数据报告...")
save_result = context['task'].get_upstream_output('save_to_db')
report = f"""
========== 招聘数据日报 ==========
日期: {context['execution_date'].strftime('%Y-%m-%d')}
新增职位: {save_result['saved_count']} 个
城市分布:
- 北京: 10个
- 上海: 10个
- 深圳: 10个
薪资范围: 20k - 50k
平均薪资: 35k
==================================
"""
print(report)
# 实
# send_email(to='admin@example.com', subject='日报', body=report)
return report
# ========== DAG定义 ==========
# 创建DAG实例
with DAG(
dag_id='job_spider_multi_city',
description='多城市招聘数据爬虫(带并行优化)',
schedule_interval=timedelta(hours=6), # 每6小时执行一次
start_date=datetime(2026, 2, 1),
) as dag:
# 定义Task
login = Task(
task_id='login',
python_callable=login_task,
retries=2,
retry_delay=30.0
)
search_beijing = Task(
task_id='search_beijing',
python_callable=search_city_task('北京'),
retries=1
)
search_shanghai = Task(
task_id='search_shanghai',
python_callable=search_city_task('上海'),
retries=1
)
search_shenzhen = Task(
task_id='search_shenzhen',
python_callable=search_city_task('深圳'),
retries=1
)
fetch_details = Task(
task_id='fetch_details',
python_callable=fetch_details_task,
timeout=300.0 # 5分钟超时
)
clean_data = Task(
task_id='clean_data',
python_callable=clean_data_task
)
save_to_db = Task(
task_id='save_to_db',
python_callable=save_to_db_task
)
generate_report = Task(
task_id='generate_report',
python_callable=generate_report_task
)
# ========== 声明依赖关系(这是最优雅的部分!) ==========
# 方式1:使用 >> 操作符(推荐)
login >> [search_beijing, search_shanghai, search_shenzhen] # login完成后,3个搜索并行
[search完成后才抓详情
fetch_details >> clean_data >> save_to_db >> generate_report # 链式依赖
# 方式2:使用 set_downstream(等价)
# login.set_downstream(search_beijing)
# login.set_downstream(search_shanghai)
# ...
DAG可视化图:
login (A)
│
┌──────┼──────┐
▼ ▼ ▼
search search search
(B1) (B2) (B3)
└──────┼──────┘
▼
fetch_details (C)
│
▼
clean_data (D)
│
▼
save_to_db (E)
│
▼
generate_report (F)
🔟 运行方式与结果展示
主程序入口
# main.py
from datetime import datetime
from core.executor import ParallelExecutor
from dags.job_spider_dag import dag
from storage.repository import DAGRepository
from utils.logger import setup_logger
from rich.console import Console
from rich.table import Table
logger = setup_logger()
console = Console()
def run_dag(dag_id: str, executor_type: str = 'parallel'):
"""
运行DAG
Args:
dag_id: DAG标识
executor_type: 执行器类型(serial/parallel)
"""
# 选择执行器
if executor_type == 'parallel':
executor = ParallelExecutor(max_workers=5)
else:
from core.executor import SequentialExecutor
executor = SequentialExecutor()
# 执行DAG
execution_date = datetime.now()
result = executor.execute_dag(dag, execution_date)
# 保存执行历史
repo = DAGRepository()
repo.save_dag_run(
dag_id=result['dag_id'],
execution_date=execution_date,
state=result['state'],
start_time=dag.start_time,
end_time=dag.end_time,
duration=result['duration']
)
# 保存每个Task的执行记录
for task in dag.tasks.values():
duration = None
if task.start_time and task.end_time:
duration = (task.end_time - task.start_time).total_seconds()
repo.save_task_instance(
task_id=task.task_id,
dag_id=dag.dag_id,
execution_date=execution_date,
state=task.state.value,
try_number=task.try_number,
start_time=task.start_time,
end_time=task.end_time,
duration=duration,
error_message=str(task.exception) if task.exception else None
)
# 打印结果摘要
print_summary(result, dag)
return result
def print_summary(result: dict, dag):
"""使用Rich库美化输出"""
console.print("\n" + "="*60, style="bold cyan")
console.print("📊 DAG执行摘要", style="bold yellow")
console.print("="*60 + "\n", style="bold cyan")
# 基本信息表格
info_table = Table(show_header=False, box=None)
info_table.add_column("属性", style="cyan")
info_table.add_column("值", style="green")
info_table.add_row("DAG ID", result['dag_id'])
info_table.add_row("状态", f"[bold {'green' if result['state'] == 'success' else 'red'}]{result['state'].upper()}[/]")
info_table.add_row("总耗时", f"{result['duration']:.2f}秒")
info_table.add_row("任务总数", str(result['total_tasks']))
info_table.add_row("成功任务", str(result['success_tasks']))
info_table.add_row("失败任务", str(len(result['failed_tasks'])))
console.print(info_table)
# 任务详情表格
console.print("\n[bold cyan]任务执行详情:[/]")
task_table = Table(show_header=True)
task_table.add_column("Task ID", style="cyan")
task_table.add_column("状态", justify="center")
task_table.add_column("耗时(秒)", justify="right")
task_table.add_column("重试次数", justify="center")
for task in dag.tasks.values():
duration = "N/A"
if task.start_time and task.end_time:
duration = f"{(task.end_time - task.start_time).total_seconds():.2f}"
# 根据状态设置颜色
state_color = {
'success': 'green',
'failed': 'red',
'skipped': 'yellow',
'pending': 'white'
}.get(task.state.value, 'white')
task_table.add_row(
task.task_id,
f"[{state_color}]{task.state.value}[/]",
duration,
str(task.try_number)
)
console.print(task_table)
console.print()
if __name__ == '__main__':
import sys
# 命令行参数解析
executor_type = 'parallel' # 默认并行执行
if len(sys.argv) > 1:
executor_type = sys.argv[1]
logger.info(f"使用执行器: {executor_type}")
try:
run_dag('job_spider_multi_city', executor_type=executor_type)
except KeyboardInterrupt:
logger.warning("⚠️ 收到中断信号,正在清理...")
except Exception as e:
logger.error(f"💥 执行失败: {e}", exc_info=True)
sys.exit(1)
启动方式
# 1. 并行执行(推荐)
python main.py parallel
# 2. 串行执行(调试用)
python main.py serial
# 3. 查看执行历史
python query_history.py --dag-id job_spider_multi_city --limit 10
预期输出
============================================================
🚀 开始并行执行DAG: job_spider_multi_city (最大5并发)
============================================================
🚀 开始执行任务: login (第1次尝试)
🔐 执行登录...
✅ 登录成功,Cookie: {'session_id': 'mock_session_123456'}
✅ 任务成功: login (耗时: 2.01秒)
🔄 并行执行 3 个任务...
🚀 开始执行任务: search_beijing (第1次尝试)
🚀 开始执行任务: search_shanghai (第1次尝试)
🚀 开始执行任务: search_shenzhen (第1次尝试)
🔍 搜索城市: 北京
🔍 搜索城市: 上海
🔍 搜索城市: 深圳
✅ 找到 10 个职位URL
✅ 任务成功: search_beijing (耗时: 2.15秒)
✅ 找到 10 个职位URL
✅ 任务成功: search_shanghai (耗时: 2.87秒)
✅ 找到 10 个职位URL
✅ 任务成功: search_shenzhen (耗时: 1.92秒)
🔄 并行执行 1 个任务...
🚀 开始执行任务: fetch_details (第1次尝试)
📥 开始抓取详情页...
📊 总共 30 个URL待抓取
✅ 抓取完成,共 5 条数据
✅ 任务成功: fetch_details (耗时: 2.56秒)
🔄 并行执行 1 个任务...
🚀 开始执行任务: clean_data (第1次尝试)
🧹 开始数据清洗...
✅ 清洗完成,去重后 5 条
✅ 任务成功: clean_data (耗时: 0.02秒)
🔄 并行执行 1 个任务...
🚀 开始执行任务: save_to_db (第1次尝试)
💾 开始入库...
✅ 已保存 5 条数据到数据库
✅ 任务成功: save_to_db (耗时: 1.01秒)
🔄 并行执行 1 个任务...
🚀 开始执行任务: generate_report (第1次尝试)
📊 生成数据报告...
========== 招聘数据日报 ==========
日期: 2026-02-06
新增职位: 5 个
城市分布:
- 北京: 10个
- 上海: 10个
- 深圳: 10个
薪资范围: 20k - 50k
平均薪资: 35k
==================================
✅ 任务成功: generate_report (耗时: 0.01秒)
✅ DAG执行成功: job_spider_multi_city (耗时: 8.62秒)
============================================================
📊 DAG执行摘要
============================================================
属性 值
DAG ID job_spider_multi_city
状态 SUCCESS
总耗时 8.62秒
任务总数 7
成功任务 7
失败任务 0
任务执行详情:
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━┓
┃ Task ID ┃ 状态 ┃ 耗时(秒) ┃ 重试次数 ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━┩
│ login │ success│ 2.01│ 1 │
│ search_beijing │ success│ 2.15│ 1 │
│ search_shanghai │ success│ 2.87│ 1 │
│ search_shenzhen │ success│ 1.92│ 1 │
│ fetch_details │ success│ 2.56│ 1 │
│ clean_data │ success│ 0.02│ 1 │
│ save_to_db │ success│ 1.01│ 1 │
│ generate_report │ success│ 0.01│ 1 │
└──────────────────┴────────┴─────────┴──────────┘
性能对比:
- 串行执行:约 13秒(所有任务顺序执行)
- 并行执行:约 8.6秒(搜索任务并行,节省5秒)
1️⃣1️⃣ 常见问题与排错
Q1: 任务之间如何传递大量数据?
问题:XCom在内存中存储,数据量大时会OOM
解决方案:
# 方式1:使用文件系统
def task_a(**context):
large_data = fetch_large_dataset() # 100MB数据
# 写入临时文件
temp_file = f"/tmp/dag_data_{context['execution_date']}.json"
with open(temp_file, 'w') as f:
json.dump(large_data, f)
return {'data_path': temp_file} # 只传路径
def task_b(**context):
result_a = context['task'].get_upstream_output('task_a')
# 读取文件
with open(result_a['data_path'], 'r') as f:
large_data = json.load(f)
process(large_data)
# 方式2:使用Redis/S3
def task_a(**context):
large_data = fetch_large_dataset()
# 存储到Redis
import redis
r = redis.Redis()
key = f"dag:{context['dag_run_id']}:task_a"
r.set(key, json.dumps(large_data), ex=3600) # 1小时过期
return {'redis_key': key}
Q2: 如何实现条件分支?
问题:根据上游任务结果决定执行哪个分支
解决方案:
def check_data_quality(**context):
"""检查数据质量"""
data = context['task'].get_upstream_output('fetch_data')
if len(data) < 100:
return 'insufficient_data' # 数据不足
elif has_errors(data):
return 'data_error' # 数据有错误
else:
return 'data_ok' # 数据正常
def branch_task(**context):
"""分支决策任务"""
quality_result = context['task'].get_upstream_output('check_quality')
if quality_result == 'data_ok':
# 只执行正常流程的下游任务
return 'process_data'
else:
# 执行异常处理流程
return 'send_alert'
# DAG定义
check_quality = Task('check_quality', check_data_quality)
branch = Task('branch', branch_task)
process_data = Task('process_data', process_normal)
send_alert = Task('send_alert', send_error_alert)
check_quality >> branch
branch >> [process_data, send_alert] # 两个分支
# 注意:这需要增强Task类,支持BranchTask类型
# 实际使用时可参考Airflow的BranchPythonOperator
Q3: DAG执行失败后如何从断点续跑?
问题:某个Task失败后,不想从头开始
解决方案:
# storage/repository.py (新增方法)
def get_failed_tasks(self, dag_id: str, execution_date: datetime) -> List[str]:
"""获取失败的任务列表"""
session = self.SessionLocal()
try:
instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.state == 'failed'
).all()
return [i.task_id for i in instances]
finally:
session.close()
# main.py (新增resume模式)
def resume_dag(dag_id: str, execution_date: datetime):
"""从失败点恢复执行"""
repo = DAGRepository()
failed_tasks = repo.get_failed_tasks(dag_id, execution_date)
logger.info(f"检测到失败任务: {failed_tasks}")
# 重置失败任务的状态
for task_id in failed_tasks:
task = dag.get_task(task_id)
task.state = TaskState.PENDING
task.try_number = 0
# 从失败点继续执行
executor = ParallelExecutor()
executor.execute_dag(dag, execution_date)
Q4: 如何监控长时间运行的Task?
问题:某个Task卡住了,不知道是否正常
解决方案:
# core/task.py (增强execute方法)
def execute(self, context: Dict[str, Any]) -> Any:
# ... 前面代码 ...
# 启动心跳线程
import threading
stop_heartbeat = threading.Event()
def heartbeat():
"""每30秒发送心跳"""
while not stop_heartbeat.is_set():
logger.info(f"💓 任务 {self.task_id} 仍在运行...")
time.sleep(30)
hb_thread = threading.Thread(target=heartbeat, daemon=True)
hb_thread.start()
try:
result = self.python_callable(...)
# ... 后面代码 ...
finally:
stop_heartbeat.set() # 停止心跳
hb_thread.join(timeout=1)
1️⃣2️⃣ 进阶优化
动态DAG生成
# dags/dynamic_dag_generator.py
from core.dag import DAG
from core.task import Task
from datetime import datetime, timedelta
def generate_multi_site_dag(sites: list) -> DAG:
"""
根据站点列表动态生成DAG
Args:
sites: 站点配置列表,如 [{'name': '站点A', 'url': '...'}]
"""
dag = DAG(
dag_id='dynamic_multi_site_scraper',
description=f'动态生成的{len(sites)}站点爬虫',
schedule_interval=timedelta(hours=12)
)
# 创建每个站点的采集任务
scrape_tasks = []
for site in sites:
task = Task(
task_id=f"scrape_{site['name']}",
python_callable=create_scrape_func(site),
dag=dag
)
scrape_tasks.append(task)
# 创建汇总任务
aggregate = Task(
task_id='aggregate_all',
python_callable=aggregate_results,
dag=dag
)
# 设置依赖:所有采集任务 -> 汇总
for task in scrape_tasks:
task >> aggregate
return dag
def create_scrape_func(site: dict):
"""创建特定站点的采集函数"""
def _scrape(**context):
print(f"抓取站点: {site['name']}")
# 实际抓取逻辑...
return {'site': site['name'], 'count': 100}
return _scrape
def aggregate_results(**context):
"""汇总所有站点结果"""
total = 0
for task_id, task in context['dag'].tasks.items():
if task_id.startswith('scrape_'):
result = context['task'].get_upstream_output(task_id)
total += result['count']
print(f"总计抓取: {total} 条")
# 使用示例
sites_config = [
{'name': 'zhipin', 'url': 'https://www.zhipin.com'},
{'name': 'lagou', 'url': 'https://www.lagou.com'},
{'name': '51job', 'url': 'https://www.51job.com'},
]
dag = generate_multi_site_dag(sites_config)
定时调度(Cron集成)
# scheduler_daemon.py
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from main import run_dag
from dags.job_spider_dag import dag as job_dag
from utils.logger import get_logger
logger = get_logger(__name__)
class DAGScheduler:
"""DAG定时调度守护进程"""
def __init__(self):
self.scheduler = BlockingScheduler()
self.register_dags()
def register_dags(self):
"""注册所有需要调度的DAG"""
# 示例1:每6小时执行一次招聘爬虫
self.scheduler.add_job(
func=lambda: run_dag('job_spider_multi_city'),
trigger=CronTrigger(hour='*/6'), # 0, 6, 12, 18点
id='job_spider_schedule',
name='招聘数据爬虫',
replace_existing=True
)
# 示例2:每天凌晨2点执行数据清理
self.scheduler.add_job(
func=self.cleanup_old_data,
trigger=CronTrigger(hour=2, minute=0),
id='cleanup_schedule',
name='数据清理任务'
)
logger.info("✅ 已注册所有调度任务")
def cleanup_old_data(self):
"""清理30天前的数据"""
from storage.repository import DAGRepository
from datetime import timedelta
repo = DAGRepository()
cutoff_date = datetime.now() - timedelta(days=30)
# 实际清理逻辑...
logger.info(f"🧹 清理 {cutoff_date} 之前的数据")
def start(self):
"""启动调度器"""
logger.info("⏰ DAG调度器启动中...")
logger.info("已注册的任务:")
for job in self.scheduler.get_jobs():
logger.info(f" - {job.name} (ID: {job.id})")
logger.info(f" 下次运行: {job.next_run_time}")
try:
self.scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.warning("⚠️ 调度器收到停止信号")
self.scheduler.shutdown()
if __name__ == '__main__':
scheduler = DAGScheduler()
scheduler.start()
部署为系统服务(Systemd):
# /etc/systemd/system/dag-scheduler.service
[Unit]
Description=DAG Scheduler Service
After=network.target
[Service]
Type=simple
User=spider
WorkingDirectory=/home/spider/dag_scheduler
ExecStart=/home/spider/dag_scheduler/venv/bin/python scheduler_daemon.py
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
# 启动服务
sudo systemctl start dag-scheduler
sudo systemctl enable dag-scheduler # 开机自启
# 查看日志
sudo journalctl -u dag-scheduler -f
Web UI(简化版)
# web_ui.py
from flask import Flask, render_template, jsonify
from storage.repository import DAGRepository
from datetime import datetime, timedelta
app = Flask(__name__)
repo = DAGRepository()
@app.route('/')
def index():
"""首页:显示所有DAG列表"""
dag_runs = repo.get_dag_runs(limit=50)
return render_template('index.html', dag_runs=dag_runs)
@app.route('/api/dag/<dag_id>/stats')
def dag_stats(dag_id: str):
"""API:获取DAG统计信息"""
stats = repo.get_task_stats(dag_id)
# 最近7天的执行历史
week_ago = datetime.now() - timedelta(days=7)
recent_runs = repo.get_dag_runs(dag_id=dag_id, limit=100)
# 计算成功率趋势
daily_stats = {}
for run in recent_runs:
if run.execution_date >= week_ago:
date_key = run.execution_date.strftime('%Y-%m-%d')
if date_key not in daily_stats:
daily_stats[date_key] = {'success': 0, 'failed': 0}
if run.state == 'success':
daily_stats[date_key]['success'] += 1
else:
daily_stats[date_key]['failed'] += 1
return jsonify({
'stats': stats,
'daily_trend': daily_stats
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
访问方式:
# 启动Web UI
python web_ui.py
# 浏览器访问
http://localhost:5000
1️⃣3️⃣ 总结与延伸阅读
我们完成了什么?
通过这篇文章,我们从零实现了一个轻量级但功能完整的DAG任务编排系统:
✅ 核心功能:
- DAG定义与验证(有向无环图检查)
- 拓扑排序(自动计算执行顺序)
- 任务状态管理(pending → running → success/failed)
- 失败重试(指数退避)
- 并行执行(线程池)
- XCom机制(任务间数据传递)
- 持久化存储(SQLite)
✅ 生产特性:
- 断点续跑(失败恢复)
- 定时调度(Cron集成)
- Web UI(可视化监控)
- 动态DAG生成
✅ 实战价值:
- 代码量:< 1000行,易于理解和定制000行,易于理解和定制
- 无重依赖:不需要Airflow/Prefect的复杂部署
- 真实可用:已在我的项目中稳定运行3个月+
与Airflow的对比
| 特性 | 本文方案 | Airflow |
|---|---|---|
| 部署复杂度 | ⭐ | ⭐⭐⭐⭐⭐ |
| 学习曲线 | ⭐⭐ | ⭐⭐⭐⭐ |
| 功能完整性 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 性能 | 中(单机万级任务/天) | 高(集群百万级/天) |
| 可定制性 | 非常高 | 中(插件机制) |
| 适用场景 | 个人/小团队 | 企业级 |
什么时候应该升级到Airflow?
- DAG数量 > 50个
- 日任务量 > 10万
- 需要多机分布式执行
- 需要完整的权限管理、审计日志
下一步可以做什么?
功能增强:
- 🔀 分支任务:实现BranchTask,支持条件分支
- ⏸️ 暂停/恢复:支持手动暂停DAG执行
- 🔔 告警通知:失败时发送邮件/钉钉/Slack通知
- 📊 更丰富的监控:集成Prometheus metrics
架构升级:
- 🐳 容器化:Docker化部署,一键启动
- 🌐 分布式执行:使用Celery实现跨机器任务分发
- 🗄️ MySQL替换SQLite:支持高并发写入
- 🎨 完整Web UI:参考Airflow UI,实现图形化DAG编辑
实际应用:
- 📰 多站点新闻聚合:每小时抓取100+新闻源
- 💼 招聘数据分析:职位趋势、薪资预测、技能需求
- 🏠 房产价格监控:追踪房价变化,自动推送降价房源
- 📈 股票数据pipeline:K线数据采集 → 技术指标计算 → 策略回测
延伸阅读
官方文档:
推荐书籍:
- 《数据流水线实战》(Data Pipelines Pocket Reference)
- 《Python并发编程》(Python Concurrency with asyncio)
开源项目参考:
最后的话:
任务编排系统的核心价值在于将复杂的业务流程结构化、可视化、可监控。你不再需要写一个3000行的大函数,而是把每个步骤拆成独立的Task,用DAG描述它们的关系,剩下的交给调度器。
这不仅让代码更优雅,更重要的是让系统具备了可观测性——你能清楚地看到每个环节的健康状态,快速定位问题,自动恢复失败。
希望这篇文章能帮你在爬虫(或任何数据pipeline)项目中引入任务编排的思想。当你的项目复杂度上升时,你会感谢曾经花时间搭建这套基础设施的自己!
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐


所有评论(0)