Apache Airflow 第一章:入门与基础实践指南
Apache Airflow 是一个开源的工作流调度与任务编排平台,通过Python代码定义任务依赖关系(DAG),实现复杂数据流水线的自动化管理。它提供可视化界面、动态依赖解析和丰富的生态系统集成能力,相比传统调度工具(如Cron)具有明显优势。本文介绍了Airflow的核心功能、设计哲学、安装部署方法,并通过代码示例展示了如何创建第一个DAG。Airflow适用于ETL数据管道调度、机器学习模
·
Apache Airflow 入门与基础实践指南
第一章:Apache Airflow 概述
1.1 什么是 Apache Airflow?
Apache Airflow 是一个开源的工作流调度与任务编排平台,由 Airbnb 于 2014 年开发并捐赠给 Apache 基金会。它通过 Python 代码定义任务依赖关系(称为 DAG),实现复杂数据流水线的自动化管理。与传统脚本调度工具不同,Airflow 提供了可视化界面、动态依赖解析和丰富的生态系统集成能力。
# 最简单的 DAG 示例
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
'hello_airflow',
description='入门示例',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False
)
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
1.2 核心功能与适用场景
Airflow 的核心能力体现在:
- 任务依赖管理:通过
>>或set_downstream()明确上下游关系 - 动态工作流:支持运行时生成 DAG 结构
- 故障恢复机制:自动重试、失败告警、任务依赖回填
- 可视化监控:Web UI 提供 DAG 图、任务状态、日志追踪
典型应用场景包括:
- ETL 数据管道调度
- 机器学习模型训练流水线
- 定时数据归档与清理
- 跨系统微服务编排
1.3 Airflow 的设计哲学
Airflow 遵循三个核心设计原则:
- 代码即配置(Infrastructure as Code):将工作流定义为 Python 文件,实现版本控制与团队协作
- 声明式任务定义:通过 DAG 对象显式声明任务关系
- 可扩展架构:支持自定义 Operator、Executor 和 Backend
# 声明式任务依赖示例
def task1():
print("执行第一个任务")
def task2():
print("执行第二个任务")
with DAG(...) as dag:
t1 = PythonOperator(task_id='t1', python_callable=task1)
t2 = PythonOperator(task_id='t2', python_callable=task2)
t1 >> t2 # 显式声明依赖关系
第二章:Airflow 与传统调度工具的对比
2.1 Cron 的局限性
传统 Unix 调度工具存在以下问题:
- 缺乏可视化界面(需通过
crontab -l查看) - 无法表达复杂依赖关系
- 错误处理能力有限
- 日志管理分散
# Cron 示例(无法处理依赖)
# 每天凌晨执行数据备份
0 0 * * * /path/to/backup.sh
# 每小时执行日志清理
0 * * * * /path/to/clean.sh
2.2 Airflow 的差异化优势
| 特性 | Cron | Airflow |
|---|---|---|
| 依赖管理 | 不支持 | 支持复杂 DAG |
| 可视化 | 否 | Web UI |
| 错误处理 | 有限 | 自动重试/告警/任务重跑 |
| 可扩展性 | 差 | 插件系统/自定义 Operator |
| 任务粒度控制 | 粗粒度 | 支持细粒度任务划分 |
2.3 场景对比示例
定时任务场景:
# Cron 实现简单定时任务
* * * * * echo "Hello Airflow"
复杂工作流场景:
# Airflow 实现多任务依赖
with DAG(...) as dag:
t1 = BashOperator(task_id='download_data', bash_command='curl -O http://data.example.com/file')
t2 = BashOperator(task_id='process_data', bash_command='python process.py')
t3 = BashOperator(task_id='upload_result', bash_command='aws s3 cp result.txt s3://bucket/')
t1 >> t2 >> t3
第三章:5 分钟快速上手 Airflow
3.1 安装方式详解
1. 使用 pip 安装
pip install apache-airflow
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
airflow webserver --port 8080
2. Docker 部署(推荐)
# docker-compose.yaml
version: '3'
services:
airflow:
image: apache/airflow:2.6.3
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
3. Kubernetes 部署
# 使用 Helm Chart
helm repo add apache https://airflow.apache.org
helm install airflow apache/airflow
3.2 初始化配置与 Web UI
核心配置文件 airflow.cfg 包含:
[core]
dags_folder = /opt/airflow/dags
executor = LocalExecutor
[webserver]
base_url = http://localhost:8080
启动 Web 服务后访问:
http://localhost:8080
3.3 创建第一个 DAG
完整示例代码(保存为 dags/hello_airflow.py):
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def greet():
return "Hello from PythonOperator!"
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hello_airflow',
default_args=default_args,
description='入门示例',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['example'],
) as dag:
print_date = BashOperator(
task_id='print_date',
bash_command='date',
)
greet_task = PythonOperator(
task_id='greet',
python_callable=greet,
do_xcom_push=True
)
print_date >> greet_task
第四章:Airflow 核心组件深度解析
4.1 Operator 分类与使用场景
| Operator 类型 | 使用场景示例 |
|---|---|
| BashOperator | 执行 shell 命令 |
| PythonOperator | 调用 Python 函数 |
| EmailOperator | 发送邮件通知 |
| DockerOperator | 在容器中运行任务 |
| KubernetesOperator | 在 Kubernetes 集群中运行任务 |
# 多类型 Operator 组合示例
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
with DAG(...) as dag:
# 下载数据
download = BashOperator(
task_id='download',
bash_command='curl -o data.csv https://example.com/data'
)
# 处理数据
def process_data():
df = pd.read_csv('data.csv')
# 数据处理逻辑...
df.to_parquet('processed.parquet')
process = PythonOperator(
task_id='process',
python_callable=process_data
)
download >> process
4.2 Sensor 的作用与常见类型
Sensor 是一种特殊 Operator,用于等待特定条件满足:
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
with DAG(...) as dag:
# 等待文件出现
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/output.txt'
)
# 等待时间间隔
wait_10_minutes = TimeDeltaSensor(
task_id='wait_10m',
delta=timedelta(minutes=10)
)
wait_for_file >> wait_10_minutes
4.3 Triggers 的基础概念
Trigger 是 Airflow 2.0 引入的新特性,用于事件驱动任务执行:
from airflow.triggers.base import BaseTrigger
from airflow.operators.empty import EmptyOperator
class CustomTrigger(BaseTrigger):
def serialize(self):
return (self.__class__.__name__, {}, None)
async def run(self):
yield TriggerEvent({'status': 'success'})
with DAG(...) as dag:
trigger_task = EmptyOperator(
task_id='trigger_task',
trigger_rule='all_success',
triggerer=CustomTrigger()
)
第五章:DAG 设计原则与最佳实践
5.1 避免循环依赖
错误示例:
# 错误:形成循环依赖
t1 >> t2
t2 >> t1
正确设计模式:
# 使用子 DAG 拆分
from airflow.utils.dag_processing import list_dags
with DAG(...) as parent_dag:
pre_process = BashOperator(...)
sub_dag = SubDagOperator(
task_id='sub_dag',
subdag=create_sub_dag(parent_dag.dag_id, 'sub_dag', ...)
)
post_process = BashOperator(...)
pre_process >> sub_dag >> post_process
5.2 schedule_interval 与 start_date 配置
# 正确配置示例
with DAG(
dag_id='hourly_dag',
schedule_interval='@hourly',
start_date=datetime(2025, 1, 1),
catchup=False
):
# 任务定义
常见问题:
- 设置
catchup=False可避免历史补跑 start_date应早于当前时间至少 24 小时
5.3 参数化 DAG
from airflow.models import Variable
with DAG(...) as dag:
def process_data(ds, **kwargs):
print(f"Processing date: {ds}")
region = kwargs['params']['region']
print(f"Region: {region}")
task = PythonOperator(
task_id='param_example',
python_callable=process_data,
params={'region': 'us-west'},
dag=dag
)
第六章:性能优化与调试技巧
6.1 使用 .airflowignore 提升效率
创建 .airflowignore 文件:
__pycache__/
*.pyc
*.log
6.2 日志分析与调试
查看日志命令:
airflow tasks log <dag_id> <task_id> <execution_date>
日志级别配置:
[logging]
log_level = INFO
6.3 测试环境与生产环境差异
| 项目 | 测试环境 | 生产环境 |
|---|---|---|
| Executor | LocalExecutor | CeleryExecutor |
| Scheduler | 单实例 | 高可用集群 |
| 存储后端 | SQLite | PostgreSQL/MySQL |
第七章:从入门到进阶的学习路径
7.1 常见问题解答
Q:任务失败后如何重试?
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': send_alert
}
Q:如何处理跨 DAG 依赖?
使用 ExternalTaskSensor:
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_other_dag = ExternalTaskSensor(
task_id='wait_for_other',
external_dag_id='other_dag',
external_task_id='task_in_other'
)
7.2 进阶学习方向
- 连接器(Hook):实现与数据库、云存储等系统的交互
- XCom:任务间通信机制
- 安全性:RBAC 权限控制、加密存储
- Airflow 2.x 新特性:Triggers、Deferrable Operators
7.3 社区生态与新特性
- 官方文档:https://airflow.apache.org/docs/
- 社区论坛:https://cncf.slack.com
- Airflow 2.6 新特性:
- 支持 KubernetesPodOperator 的自动重试
- 增强的 DAG 参数化功能
术语表
| 术语 | 解释 |
|---|---|
| DAG | 有向无环图,表示任务依赖关系 |
| Task | 工作流中的单个操作单元 |
| Operator | 定义任务行为的类(如 BashOperator) |
| Sensor | 等待特定条件满足的特殊 Operator |
| Trigger | 事件驱动任务执行的机制 |
附录:配套练习任务
- 本地环境搭建:使用 Docker 部署 Airflow 并创建第一个 DAG
- 多依赖任务实现:
- 创建包含 3 个任务的 DAG
- 使用 FileSensor 等待文件
- 使用 XCom 传递数据
- 性能优化实践:
- 配置 .airflowignore 文件
- 分析任务日志并优化执行时间
更多推荐

所有评论(0)