🌀 Airflow 是什么?

Apache Airflow 是一个 开源的任务调度与工作流编排平台。它最常见的用途是:

  • 定时任务调度(替代 cron)
  • 数据管道管理(ETL、数据清洗、报表生成)
  • 任务依赖编排(复杂流程控制)
  • 可视化监控与重试机制

🌀 Airflow 安装

在 Python 里安装 Airflow 有几个注意点:


1️⃣ 推荐方式:使用 pip 安装

Airflow 的安装包很大,依赖也多,官方推荐指定 extras约束文件(constraints file)

比如安装 最新稳定版 Airflow 2.x

# 安装基础 Airflow
pip install "apache-airflow==2.10.2" \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.9.txt"

这里说明一下:

  • ==2.10.2 → 指定 Airflow 版本(官方推荐固定版本)

  • constraints-3.9.txt → 指定 Python 版本对应的依赖约束文件,防止依赖冲突

    • 如果你是 Python 3.8,用 constraints-3.8.txt
    • 如果你是 Python 3.10,用 constraints-3.10.txt

2️⃣ 带上 extras

Airflow 提供了很多 provider(支持不同数据库/云服务),可以一次性安装需要的:

常见组合:

# Postgres + MySQL + Google + Amazon
pip install "apache-airflow[postgres,mysql,google,amazon]==2.10.2" \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.9.txt"

# 如果只要本地调度 + sqlite
pip install "apache-airflow[celery,sqlite]==2.10.2" \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.9.txt"

3️⃣ 初始化数据库

安装完成后,第一次运行要初始化 Airflow 元数据库:

airflow db init

这会在 ~/airflow/ 下创建配置、数据库文件等。


4️⃣ 启动 Web UI & 调度器

启动 webserver(默认 8080 端口):

airflow webserver --port 8080

启动 scheduler:

airflow scheduler

然后访问 👉 http://localhost:8080 就能看到 Airflow UI。


5️⃣ 用户管理

Airflow 2.x 需要创建用户才能登录 UI:

airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

🔑 Airflow 的核心功能

1️⃣ DAG(Directed Acyclic Graph)

  • Airflow 用 DAG(有向无环图) 来定义工作流
  • DAG 里的每个节点是一个 Task
  • 描述任务间的依赖关系

👉 示例:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG("example_dag", start_date=datetime(2025, 8, 19), schedule="@daily") as dag:
    task1 = BashOperator(task_id="say_hello", bash_command="echo 'Hello'")
    task2 = BashOperator(task_id="say_world", bash_command="echo 'World'")
    
    task1 >> task2   # task1 完成后执行 task2

2️⃣ Operators(任务执行器)

Airflow 提供丰富的 Operator 来运行不同类型的任务:

  • BashOperator → 执行 Shell 命令
  • PythonOperator → 执行 Python 函数
  • EmailOperator → 发送邮件
  • SqlOperator(MySQL、Postgres、BigQuery…)→ 执行 SQL
  • HttpOperator → 调用 API
  • DockerOperator / KubernetesPodOperator → 在容器里执行任务

👉 也可以写自定义 Operator。


3️⃣ 调度(Scheduling)

  • 类似 cron 表达式,支持 @hourly, @daily, 0 6 * * *
  • 使用 start_date + schedule_interval 定义
  • DAG 会按调度自动运行,不需要人工干预

4️⃣ 依赖管理

Airflow 可以灵活地描述任务依赖:

task1 >> task2 >> task3
task4.set_upstream(task2)
task5.set_downstream(task3)

也支持 分支(BranchOperator)、条件循环(Dynamic Task Mapping)


5️⃣ 监控 & Web UI

  • 内置 Web UI,可以查看:

    • DAG 执行情况
    • Task 状态(成功 / 失败 / 重试中)
    • 日志
  • 支持 任务重试、失败告警(Email/Slack/Webhook)


6️⃣ 可扩展性

  • Hooks:封装外部服务连接(数据库、S3、Redis…)
  • XCom:任务之间传递数据
  • Plugins:扩展 UI、Operator、Executor

7️⃣ 执行方式

  • LocalExecutor → 单机执行
  • CeleryExecutor → 分布式执行,支持大规模任务
  • KubernetesExecutor → 任务在 K8s Pod 中运行

🚀 典型应用场景

  1. 数据工程:定时抓取数据 → 清洗 → 存入数据库 → 生成报表
  2. 机器学习:训练数据准备 → 模型训练 → 模型部署 → 监控
  3. 业务任务调度:自动发送日报、对账任务、批量任务依赖管理
  4. 事件驱动任务:结合 Sensor(如 FileSensor 监控文件、HttpSensor 等)

✨ 优势

  • 声明式定义任务(Python 代码就是 DAG)
  • 可视化 & 易监控
  • 灵活的依赖管理
  • 扩展性强(各种 Operator/Hook/Executor)
  • 社区活跃,支持云原生(K8s、云存储、大数据平台)

Airflow 实战 DAG 示例

📌 每天定时 → 读取数据库 → 清洗数据 → 存入 CSV → 发送邮件

好 👍 我给你写一个 Airflow 实战 DAG 示例,场景是:

每天定时执行 → 从数据库读取数据 → 清洗数据 → 保存为 CSV → 邮件发送报表


1️⃣ DAG 代码示例

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import pandas as pd
import sqlalchemy

# ========== 任务函数 ==========
def extract_data(**context):
    """从数据库提取数据"""
    engine = sqlalchemy.create_engine("mysql+pymysql://user:password@localhost:3306/testdb")
    query = "SELECT id, name, created_at FROM users WHERE created_at >= CURDATE();"
    df = pd.read_sql(query, engine)
    context['ti'].xcom_push(key='raw_data', value=df.to_json())

def transform_data(**context):
    """清洗数据"""
    import json
    df = pd.read_json(context['ti'].xcom_pull(key='raw_data'))
    df['name'] = df['name'].str.upper()   # 示例清洗:名字转大写
    context['ti'].xcom_push(key='clean_data', value=df.to_json())

def save_to_csv(**context):
    """保存为 CSV 文件"""
    import json
    df = pd.read_json(context['ti'].xcom_pull(key='clean_data'))
    file_path = f"/tmp/report_{datetime.now().strftime('%Y%m%d')}.csv"
    df.to_csv(file_path, index=False)
    context['ti'].xcom_push(key='file_path', value=file_path)

# ========== DAG 定义 ==========
default_args = {
    'owner': 'liu',
    'depends_on_past': False,
    'email': ['your_email@example.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_report_pipeline',
    default_args=default_args,
    description='每日数据报表流水线',
    schedule_interval='0 8 * * *',  # 每天早上 8 点执行
    start_date=datetime(2025, 8, 19),
    catchup=False,
    tags=['report', 'etl'],
) as dag:

    task_extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        provide_context=True,
    )

    task_transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        provide_context=True,
    )

    task_save = PythonOperator(
        task_id='save_to_csv',
        python_callable=save_to_csv,
        provide_context=True,
    )

    task_email = EmailOperator(
        task_id='send_email',
        to='manager@example.com',
        subject='每日用户报表',
        html_content='请查收附件中的用户报表',
        files=["/tmp/report_{{ ds_nodash }}.csv"]
    )

    # 依赖关系:抽取 → 清洗 → 保存 → 发送邮件
    task_extract >> task_transform >> task_save >> task_email

2️⃣ 运行流程

  1. 每天 08:00 调度器触发 DAG
  2. extract_data → 从数据库提取数据
  3. transform_data → 清洗数据
  4. save_to_csv → 保存为 CSV 文件
  5. send_email → 把 CSV 附件发给管理者

3️⃣ 特点

  • 使用 XCom 在任务间传递数据(提取 → 清洗 → 保存)
  • 使用 EmailOperator 发送邮件,支持附件
  • 可以扩展为更多步骤(比如上传到 S3 / BigQuery)
  • 可在 Web UI 里可视化任务依赖、监控日志

Airflow + Watchdog + Pendulum 的增强版 DAG

既能定时执行,也能监听新文件出现时触发处理,并用 Pendulum 格式化日志。
好 👌 我来给你写一个 Airflow + Watchdog + Pendulum 的增强版 DAG 示例

📌 场景:

  • 每天早上 8 点定时检查数据目录
  • 如果有新文件(CSV),就处理并保存到数据库
  • 使用 Pendulum 格式化日志时间
  • 使用 Watchdog 监听文件夹新文件出现时也能触发

1️⃣ DAG 代码

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from datetime import timedelta
import pendulum
import os
import pandas as pd
import sqlalchemy

# 配置
WATCH_FOLDER = "/tmp/data_input"
DB_URL = "sqlite:////tmp/test.db"

# ========== 任务函数 ==========
def check_new_file():
    """检测文件夹中是否有新 CSV 文件"""
    files = [f for f in os.listdir(WATCH_FOLDER) if f.endswith(".csv")]
    return len(files) > 0

def process_new_file(**context):
    """处理新文件并存入数据库"""
    files = [f for f in os.listdir(WATCH_FOLDER) if f.endswith(".csv")]
    if not files:
        print("[INFO] 没有新文件")
        return
    
    latest_file = max(files, key=lambda f: os.path.getctime(os.path.join(WATCH_FOLDER, f)))
    file_path = os.path.join(WATCH_FOLDER, latest_file)

    # 用 pendulum 格式化时间日志
    now = pendulum.now("Asia/Shanghai")
    print(f"[{now.to_datetime_string()}] 处理文件: {latest_file}")

    # 读取 CSV 并写入数据库
    df = pd.read_csv(file_path)
    engine = sqlalchemy.create_engine(DB_URL)
    df.to_sql("users", engine, if_exists="append", index=False)
    print(f"[INFO] 已写入数据库: {DB_URL}")

    # 删除文件,避免重复处理
    os.remove(file_path)
    print(f"[INFO] 已删除文件: {file_path}")

# ========== DAG 定义 ==========
default_args = {
    'owner': 'liu',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='file_monitor_pipeline',
    default_args=default_args,
    description='文件监控 + 定时处理 DAG',
    schedule_interval='0 8 * * *',  # 每天 8 点执行
    start_date=pendulum.datetime(2025, 8, 19, tz="Asia/Shanghai"),
    catchup=False,
    tags=['watchdog', 'pendulum', 'etl'],
) as dag:

    # 传感器:检查是否有新文件
    wait_for_file = PythonSensor(
        task_id="wait_for_file",
        python_callable=check_new_file,
        poke_interval=60,   # 每 60 秒检测一次
        timeout=60*60,      # 最多等 1 小时
        mode="poke",        # poke 模式(还有 reschedule 模式)
    )

    # 处理新文件
    task_process = PythonOperator(
        task_id="process_file",
        python_callable=process_new_file,
        provide_context=True,
    )

    wait_for_file >> task_process

2️⃣ 流程说明

  • 每天早上 8 点 定时运行 DAG

  • wait_for_file 会在目录 /tmp/data_input 里检查是否有新 CSV 文件

  • 如果有,就执行 process_file

    • Pendulum 打印当前时间日志
    • 读取 CSV → 写入数据库(SQLite 示例)
    • 删除已处理的文件
  • Watchdog 思路:在实际部署时,可以配合 FileSensor / 自定义 WatchdogSensor 来实时监听文件事件 → 触发 DAG 执行


3️⃣ 优化方向

  • 可以改成 上传到 S3 / HDFS / BigQuery
  • 可以把 Watchdog 做成一个 自定义 Sensor(监听新文件事件 → 触发 DAG)
  • 可以加上 邮件通知,告诉用户“今天的数据已处理完成”

Airflow 与其他任务调度应用对比

工具 类型 主要特点 优势 劣势 适用场景
Airflow 工作流编排 (Workflow Orchestration) DAG(有向无环图)定义任务,强大的 Web UI,丰富的 provider 插件(数据库、云平台) 成熟生态、社区大、可扩展性强、UI 直观、调度灵活 学习曲线相对陡,安装复杂(依赖多),不适合毫秒级调度 数据管道调度、ETL、机器学习训练流水线
Cron 简单调度器 系统级任务定时器,使用 crontab 表达式 轻量、无依赖、内置工具 无依赖管理、无任务状态监控、无可视化 简单周期任务,如日志清理、定时脚本
Luigi 工作流调度 Python 编写任务依赖,注重数据管道构建 易与 Python 脚本结合,适合小型 ETL UI 功能弱,生态比 Airflow 小 中小型 ETL、数据依赖任务
Prefect 新一代工作流编排 强调“任务流编程”,云端和本地都支持,API 直观 API 简单,Pythonic,部署灵活,内置失败恢复 社区比 Airflow 小,部分功能依赖 Prefect Cloud 数据工程、快速原型、云原生任务调度
Dagster 数据工作流平台 强调“数据资产导向”,强类型化 pipeline 强数据可观测性、开发者体验好 学习成本高,生态较新 数据管道、数据质量监控
Oozie Hadoop 调度器 与 Hadoop 生态深度集成,XML 配置 适合大规模 Hadoop 作业调度 配置复杂、UI 差,现代替代方案多 老 Hadoop/Spark 集群调度
Celery 分布式任务队列 专注异步任务分发和执行,非工作流 高并发,实时任务调度 缺少 DAG 编排、监控功能弱 Web 服务后台任务、异步执行
KubeFlow Pipelines 云原生 ML 工作流 基于 Kubernetes,专注 ML pipeline 原生支持 Kubernetes 和 ML 工具 部署复杂,学习曲线陡峭 云原生 ML 工作流编排

🔑 总结

  • Airflow:适合 企业级数据管道调度,优势是生态成熟、UI 完善。
  • Cron:适合简单定时脚本。
  • Luigi:轻量化数据管道。
  • Prefect/Dagster:Airflow 的新型替代,API 更现代化,适合云原生。
  • Celery:做异步任务,不适合复杂依赖。
  • Oozie:Hadoop 时代遗产。
  • KubeFlow:如果你做机器学习工作流,适合云原生部署。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐