Apache Airflow 入门与基础实践指南


第一章:Apache Airflow 概述

1.1 什么是 Apache Airflow?

Apache Airflow 是一个开源的工作流调度与任务编排平台,由 Airbnb 于 2014 年开发并捐赠给 Apache 基金会。它通过 Python 代码定义任务依赖关系(称为 DAG),实现复杂数据流水线的自动化管理。与传统脚本调度工具不同,Airflow 提供了可视化界面、动态依赖解析和丰富的生态系统集成能力。

# 最简单的 DAG 示例
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    'hello_airflow',
    description='入门示例',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False
)

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)

1.2 核心功能与适用场景

Airflow 的核心能力体现在:

  • 任务依赖管理:通过 >>set_downstream() 明确上下游关系
  • 动态工作流:支持运行时生成 DAG 结构
  • 故障恢复机制:自动重试、失败告警、任务依赖回填
  • 可视化监控:Web UI 提供 DAG 图、任务状态、日志追踪

典型应用场景包括:

  • ETL 数据管道调度
  • 机器学习模型训练流水线
  • 定时数据归档与清理
  • 跨系统微服务编排

1.3 Airflow 的设计哲学

Airflow 遵循三个核心设计原则:

  1. 代码即配置(Infrastructure as Code):将工作流定义为 Python 文件,实现版本控制与团队协作
  2. 声明式任务定义:通过 DAG 对象显式声明任务关系
  3. 可扩展架构:支持自定义 Operator、Executor 和 Backend
# 声明式任务依赖示例
def task1():
    print("执行第一个任务")

def task2():
    print("执行第二个任务")

with DAG(...) as dag:
    t1 = PythonOperator(task_id='t1', python_callable=task1)
    t2 = PythonOperator(task_id='t2', python_callable=task2)
    t1 >> t2  # 显式声明依赖关系

第二章:Airflow 与传统调度工具的对比

2.1 Cron 的局限性

传统 Unix 调度工具存在以下问题:

  • 缺乏可视化界面(需通过 crontab -l 查看)
  • 无法表达复杂依赖关系
  • 错误处理能力有限
  • 日志管理分散
# Cron 示例(无法处理依赖)
# 每天凌晨执行数据备份
0 0 * * * /path/to/backup.sh
# 每小时执行日志清理
0 * * * * /path/to/clean.sh

2.2 Airflow 的差异化优势

特性 Cron Airflow
依赖管理 不支持 支持复杂 DAG
可视化 Web UI
错误处理 有限 自动重试/告警/任务重跑
可扩展性 插件系统/自定义 Operator
任务粒度控制 粗粒度 支持细粒度任务划分

2.3 场景对比示例

定时任务场景

# Cron 实现简单定时任务
* * * * * echo "Hello Airflow"

复杂工作流场景

# Airflow 实现多任务依赖
with DAG(...) as dag:
    t1 = BashOperator(task_id='download_data', bash_command='curl -O http://data.example.com/file')
    t2 = BashOperator(task_id='process_data', bash_command='python process.py')
    t3 = BashOperator(task_id='upload_result', bash_command='aws s3 cp result.txt s3://bucket/')
    
    t1 >> t2 >> t3

第三章:5 分钟快速上手 Airflow

3.1 安装方式详解

1. 使用 pip 安装
pip install apache-airflow
airflow db init
airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com
airflow webserver --port 8080
2. Docker 部署(推荐)
# docker-compose.yaml
version: '3'
services:
  airflow:
    image: apache/airflow:2.6.3
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
3. Kubernetes 部署
# 使用 Helm Chart
helm repo add apache https://airflow.apache.org
helm install airflow apache/airflow

3.2 初始化配置与 Web UI

核心配置文件 airflow.cfg 包含:

[core]
dags_folder = /opt/airflow/dags
executor = LocalExecutor

[webserver]
base_url = http://localhost:8080

启动 Web 服务后访问:

http://localhost:8080

3.3 创建第一个 DAG

完整示例代码(保存为 dags/hello_airflow.py):

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def greet():
    return "Hello from PythonOperator!"

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='hello_airflow',
    default_args=default_args,
    description='入门示例',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date',
    )
    
    greet_task = PythonOperator(
        task_id='greet',
        python_callable=greet,
        do_xcom_push=True
    )
    
    print_date >> greet_task

第四章:Airflow 核心组件深度解析

4.1 Operator 分类与使用场景

Operator 类型 使用场景示例
BashOperator 执行 shell 命令
PythonOperator 调用 Python 函数
EmailOperator 发送邮件通知
DockerOperator 在容器中运行任务
KubernetesOperator 在 Kubernetes 集群中运行任务
# 多类型 Operator 组合示例
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

with DAG(...) as dag:
    # 下载数据
    download = BashOperator(
        task_id='download',
        bash_command='curl -o data.csv https://example.com/data'
    )
    
    # 处理数据
    def process_data():
        df = pd.read_csv('data.csv')
        # 数据处理逻辑...
        df.to_parquet('processed.parquet')
    
    process = PythonOperator(
        task_id='process',
        python_callable=process_data
    )
    
    download >> process

4.2 Sensor 的作用与常见类型

Sensor 是一种特殊 Operator,用于等待特定条件满足:

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor

with DAG(...) as dag:
    # 等待文件出现
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/data/output.txt'
    )
    
    # 等待时间间隔
    wait_10_minutes = TimeDeltaSensor(
        task_id='wait_10m',
        delta=timedelta(minutes=10)
    )
    
    wait_for_file >> wait_10_minutes

4.3 Triggers 的基础概念

Trigger 是 Airflow 2.0 引入的新特性,用于事件驱动任务执行:

from airflow.triggers.base import BaseTrigger
from airflow.operators.empty import EmptyOperator

class CustomTrigger(BaseTrigger):
    def serialize(self):
        return (self.__class__.__name__, {}, None)

    async def run(self):
        yield TriggerEvent({'status': 'success'})

with DAG(...) as dag:
    trigger_task = EmptyOperator(
        task_id='trigger_task',
        trigger_rule='all_success',
        triggerer=CustomTrigger()
    )

第五章:DAG 设计原则与最佳实践

5.1 避免循环依赖

错误示例:

# 错误:形成循环依赖
t1 >> t2
t2 >> t1

正确设计模式:

# 使用子 DAG 拆分
from airflow.utils.dag_processing import list_dags

with DAG(...) as parent_dag:
    pre_process = BashOperator(...)
    sub_dag = SubDagOperator(
        task_id='sub_dag',
        subdag=create_sub_dag(parent_dag.dag_id, 'sub_dag', ...)
    )
    post_process = BashOperator(...)
    
    pre_process >> sub_dag >> post_process

5.2 schedule_interval 与 start_date 配置

# 正确配置示例
with DAG(
    dag_id='hourly_dag',
    schedule_interval='@hourly',
    start_date=datetime(2025, 1, 1),
    catchup=False
):
    # 任务定义

常见问题:

  • 设置 catchup=False 可避免历史补跑
  • start_date 应早于当前时间至少 24 小时

5.3 参数化 DAG

from airflow.models import Variable

with DAG(...) as dag:
    def process_data(ds, **kwargs):
        print(f"Processing date: {ds}")
        region = kwargs['params']['region']
        print(f"Region: {region}")
    
    task = PythonOperator(
        task_id='param_example',
        python_callable=process_data,
        params={'region': 'us-west'},
        dag=dag
    )

第六章:性能优化与调试技巧

6.1 使用 .airflowignore 提升效率

创建 .airflowignore 文件:

__pycache__/
*.pyc
*.log

6.2 日志分析与调试

查看日志命令:

airflow tasks log <dag_id> <task_id> <execution_date>

日志级别配置:

[logging]
log_level = INFO

6.3 测试环境与生产环境差异

项目 测试环境 生产环境
Executor LocalExecutor CeleryExecutor
Scheduler 单实例 高可用集群
存储后端 SQLite PostgreSQL/MySQL

第七章:从入门到进阶的学习路径

7.1 常见问题解答

Q:任务失败后如何重试?

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': send_alert
}

Q:如何处理跨 DAG 依赖?
使用 ExternalTaskSensor

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_other_dag = ExternalTaskSensor(
    task_id='wait_for_other',
    external_dag_id='other_dag',
    external_task_id='task_in_other'
)

7.2 进阶学习方向

  • 连接器(Hook):实现与数据库、云存储等系统的交互
  • XCom:任务间通信机制
  • 安全性:RBAC 权限控制、加密存储
  • Airflow 2.x 新特性:Triggers、Deferrable Operators

7.3 社区生态与新特性

  • 官方文档:https://airflow.apache.org/docs/
  • 社区论坛:https://cncf.slack.com
  • Airflow 2.6 新特性
    • 支持 KubernetesPodOperator 的自动重试
    • 增强的 DAG 参数化功能

术语表

术语 解释
DAG 有向无环图,表示任务依赖关系
Task 工作流中的单个操作单元
Operator 定义任务行为的类(如 BashOperator)
Sensor 等待特定条件满足的特殊 Operator
Trigger 事件驱动任务执行的机制

附录:配套练习任务

  1. 本地环境搭建:使用 Docker 部署 Airflow 并创建第一个 DAG
  2. 多依赖任务实现
    • 创建包含 3 个任务的 DAG
    • 使用 FileSensor 等待文件
    • 使用 XCom 传递数据
  3. 性能优化实践
    • 配置 .airflowignore 文件
    • 分析任务日志并优化执行时间

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐