Airflow 入门案例教程

1. 安装 Airflow 环境

1.1安装Airflow环境

环境信息: Python 版本: 3.10.18

  • 环境名称: crawl_py310 (下面所有涉及环境名称命令,改成相应的个人环境名称)
# 环境名称: crawl_py310
# 环境路径: `/Library/anaconda3/envs/crawl_py310`
# 安装命令: `constraints-3.10.txt` 使用约束文件确保所有依赖包的版本兼容性
python -m pip install "apache-airflow==2.7.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"

1.2检查Airflow环境

# 检查 Airflow 版本
python -c "import airflow; print(airflow.__version__)"

# 检查安装的包
pip list | grep airflow
airflow version

1.3常见安装问题

问题1: 版本冲突

现象: 安装时出现版本冲突错误
原因: 依赖包版本不兼容
解决: 使用约束文件,或清理环境重新安装

问题2: 网络超时

现象: 下载包时网络超时
原因: 网络连接不稳定或防火墙限制
解决: 使用国内镜像源或重试安装

问题3: 权限不足

现象: 安装时提示权限错误
原因: 没有写入权限
解决: 使用 --user 参数或检查环境权限

2. Airflow 项目环境配置

2.1 环境配置流程图

flowchart TD
    subgraph ENV ["1. 环境准备阶段"]
        A[激活 Python 环境<br/>conda activate crawl_py310]
        B[设置环境变量<br/>AIRFLOW_HOME=~/airflow]
        C[创建目录结构<br/>mkdir -p ~/airflow/dags/logs/plugins]
    end
    
    subgraph DB ["2. 数据库初始化阶段"]
        D[初始化数据库<br/>airflow db init]
        E[验证数据库连接<br/>airflow db check]
        F[创建管理员用户<br/>airflow users create]
    end
    
    subgraph SERVICE ["3. 服务启动阶段"]
        G[启动 Web 服务器<br/>airflow webserver]
        H[启动调度器<br/>airflow scheduler]
    end
    
    ENV --> DB
    DB --> SERVICE
    
    style ENV fill:#e1f5fe,stroke:#01579b,stroke-width:3px
    style DB fill:#f3e5f5,stroke:#4a148c,stroke-width:3px
    style SERVICE fill:#e8f5e8,stroke:#1b5e20,stroke-width:3px
    
    style A fill:#ffffff,stroke:#01579b,stroke-width:1px
    style B fill:#ffffff,stroke:#01579b,stroke-width:1px
    style C fill:#ffffff,stroke:#01579b,stroke-width:1px
    style D fill:#ffffff,stroke:#4a148c,stroke-width:1px
    style E fill:#ffffff,stroke:#4a148c,stroke-width:1px
    style F fill:#ffffff,stroke:#4a148c,stroke-width:1px
    style G fill:#ffffff,stroke:#1b5e20,stroke-width:1px
    style H fill:#ffffff,stroke:#1b5e20,stroke-width:1px

流程说明:

  1. 一次性设置阶段(蓝色节点)

    • 激活 Python 环境:确保使用正确的 Python 版本和依赖包
    • 设置环境变量:定义 Airflow 工作目录和配置
    • 创建目录结构:建立必要的文件夹结构
  2. 数据库初始化阶段(紫色节点)

    • 初始化数据库:创建 SQLite 数据库和表结构
    • 验证数据库连接:确认数据库可以正常访问
    • 创建管理员用户:设置登录账户
  3. 服务启动阶段(绿色节点)

    • 启动 Web 服务器:提供 Web UI 界面
    • 启动调度器:执行任务调度和监控

重要说明:

  • 前6个步骤为一次性设置,完成后无需重复
  • 最后2个步骤为每次启动 Airflow 时需要执行
  • 所有步骤必须按顺序执行,确保依赖关系正确

2.2 环境准备阶段

激活 Python 环境

# 切换到 Airflow 环境
conda activate crawl_py310

# 验证环境
which python
python --version

设置环境变量

# 设置 Airflow 主目录
export AIRFLOW_HOME=~/airflow

# 设置时区(可选,默认为 UTC)
export AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai

# 验证环境变量
echo $AIRFLOW_HOME

创建目录结构

# 创建 Airflow 必要的目录结构
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins

# 验证目录创建
ls -la ~/airflow/

2.3数据库初始化阶段

2.3.1 数据库初始化
# 切换到 Airflow 环境
conda activate crawl_py310

# 检查数据库连接状态
airflow db check

# 初始化 Airflow 元数据库(默认使用 SQLite)
airflow db init

# 验证数据库初始化
airflow db check

# 数据库重新初始化
airflow dbt reset
# 重新初始化数据库(删除所有数据并重新创建)
airflow db reset --yes

关闭数据库服务

本项目是采用 SQLite 数据库,不需要单独的服务进程,关闭 Airflow 服务时,SQLite 数据库连接会自动关闭。

2.3.2 数据库连接排障

如果 airflow db check 返回错误,可能的原因和解决方案:

错误1: 数据库文件不存在

# 错误信息
[ERROR] Connection failed.
[ERROR] No such file or directory: '/path/to/airflow.db'

# 解决方案
# 1. 检查 AIRFLOW_HOME 环境变量
echo $AIRFLOW_HOME

# 2. 创建 Airflow 目录
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins

# 3. 设置环境变量
export AIRFLOW_HOME=~/airflow

# 4. 初始化数据库
airflow db init

错误2: 数据库未初始化

# 错误信息
[ERROR] Connection failed.
[ERROR] no such table: ab_permission

# 解决方案
# 直接初始化数据库
airflow db init

错误3: 权限问题

# 错误信息
[ERROR] Connection failed.
[ERROR] permission denied: '/path/to/airflow.db'

# 解决方案
# 1. 检查文件权限
ls -la ~/airflow/airflow.db

# 2. 修改权限
chmod 644 ~/airflow/airflow.db

# 3. 或者重新创建数据库
rm ~/airflow/airflow.db
airflow db init

错误4: 配置错误

# 错误信息
[ERROR] Connection failed.
[ERROR] Invalid database URL format

# 解决方案
# 1. 检查数据库配置
airflow config get-value database sql_alchemy_conn

# 2. 重置为默认配置
unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
export AIRFLOW_HOME=~/airflow

# 3. 重新初始化
airflow db init

使用排障脚本:

# 运行自动排障脚本
./troubleshoot_db.sh

# 脚本会自动检查并修复常见问题
2.3.3 创建和管理用户

查看用户信息

# 列出所有用户
airflow users list

# 查看用户详细信息(通过用户名)
airflow users list | grep <username>

# 查看用户详细信息(通过邮箱)
airflow users list | grep <email>

管理用户

角色 权限范围 适用场景
Public 仅能访问登录页面,无实际操作权限 临时访客(基本无用)
Viewer 查看 DAG、任务日志、任务状态,但不能修改或触发 监控人员/只读审计
User 查看 + 编辑 DAG 代码(可上传/修改),但不能触发任务或修改变量 开发人员(编写DAG)
Op 查看 + 触发任务、清除任务状态、标记成功/失败,但不能修改 DAG 代码 运维人员(日常操作)
Admin 完全控制(包括用户管理、变量/连接编辑、所有 DAG 操作) 系统管理员
# 创建管理员用户
airflow users create --username admin --password admin123 --role Admin --email xxx@xxx.com --firstname xx --lastname xx

# 创建普通用户, 邮箱冲突:Airflow 不允许重复的邮箱地址,使用不同邮箱解决
airflow users create --username xx --password xxx123 --firstname xx --lastname he --role Public --email xxxx@xxx.com
# 授权
airflow users add-role --username xx --role User
airflow users add-role --username xx --role Viewer


# 验证用户创建
airflow users list

删除用户

# 通过用户名删除用户
airflow users delete --username test_user

# 通过邮箱删除用户
airflow users delete --email <email>

更改用户

# 为用户添加角色
airflow users add-role --username <username> --role <role>

# 移除用户角色
airflow users remove-role --username <username> --role <role>

2.4 服务启动阶段

2.4.1 查看 Airflow 服务

查看Airflow进程

# 查看所有 Airflow 相关进程
ps aux | grep airflow

# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080

# 查看特定 Airflow 进程(更精确的过滤)
ps aux | grep -E "(airflow webserver|airflow scheduler|airflow worker)"

# 查看进程树(显示父子关系)
pstree -p | grep airflow

# 查看进程详细信息
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime

查看端口占用情况

# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080

# 查看所有监听端口
lsof -i -P -n | grep LISTEN

# 查看所有 Airflow 相关端口
lsof -i | grep airflow

# 使用 netstat 查看端口
netstat -an | grep :8080

# 使用 ss 命令查看端口(更现代的方式)
ss -tuln | grep :8080

查看 Airflow 服务状态

# 检查 Airflow 数据库连接状态
airflow db check

# 检查 Airflow 配置
airflow config list

# 检查 DAG 列表
airflow dags list

# 检查任务状态
airflow tasks list
2.4.2 启动 Airflow 服务

启动 Web 服务器

# 前台启动 Web 服务器(推荐用于调试)
airflow webserver --port 8080

# 后台启动 Web 服务器(推荐用于生产环境)
nohup airflow webserver --port 8080 > ~/Downloads/airflow_webserver.log 2>&1 &

# 使用其他端口启动(如果 8080 被占用)
airflow webserver --port 8081

# 指定主机地址启动
airflow webserver --host 0.0.0.0 --port 8080

# 检查airflow web是否启动
ps aux | grep airflow-webserver
ps aux | grep airflow
# 检查端口
lsof -i :8080
# 检查 Airflow Web ip
curl -I http://localhost:8080

启动调度器

调度器(Scheduler) 类似 linux crontab,没有 crontab 则不能自动调度任务,但仍可以手动调度

# 前台启动调度器(推荐用于调试)
airflow scheduler

# 后台启动调度器(推荐用于生产环境)
nohup airflow scheduler > ~/Downloads/airflow_scheduler.log 2>&1 &

# 指定配置文件启动
airflow scheduler --config /path/to/airflow.cfg

# 启动调度器并指定日志级别
airflow scheduler --log-level DEBUG

# 检查scheduler启动情况
ps aux | grep -E "(airflow scheduler|airflow worker)"

服务启动验证

# 检查进程是否启动
ps aux | grep airflow

# 检查端口是否监听
lsof -i :8080

# 检查日志文件
tail -f airflow_webserver.log
tail -f airflow_scheduler.log

# 检查服务健康状态
curl -I http://localhost:8080
2.4.3 验证服务功能

Web UI 访问验证

# 检查 Web 服务器是否可访问
curl -I http://localhost:8080

# 检查 API 端点
curl http://localhost:8080/api/v1/health

# 检查版本信息
curl http://localhost:8080/api/v1/version

登录验证

  • Web UI 地址: http://localhost:8080
  • 管理员账户:
    • 用户名: admin
    • 密码: admin123

功能验证命令

# 检查 DAG 列表
airflow dags list

# 检查任务列表
airflow tasks list

# 检查连接配置
airflow connections list

# 检查变量配置
airflow variables list

# 检查池配置
airflow pools list

# 检查插件
airflow plugins

服务状态监控

# 查看调度器状态
airflow jobs check

# 查看任务实例
airflow tasks list <dag_id>

# 查看 DAG 运行历史
airflow dags list-runs

# 查看任务运行历史
airflow tasks list-runs <dag_id> <task_id>

日志查看

# 查看 Web 服务器日志
tail -f ~/airflow/logs/webserver/webserver.log

# 查看调度器日志
tail -f ~/airflow/logs/scheduler/latest/*.log

# 查看 DAG 日志
airflow tasks logs <dag_id> <task_id> <execution_date>

# 查看最新日志
find ~/airflow/logs -name "*.log" -type f -exec ls -lt {} + | head -10

性能检查

# 检查数据库连接
airflow db check

# 检查配置
airflow config list

# 检查版本信息
airflow version

# 检查环境信息
airflow info

常见问题排查

# 如果 Web UI 无法访问
curl -v http://localhost:8080

# 如果调度器不工作
airflow scheduler --dry-run

# 检查端口占用
lsof -i :8080

# 检查进程状态
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime

3. DAG 开发与实践

3.1 第一个实战 DAG

3.1.1 创建 Hello World DAG

创建 DAG 文件

# 创建 DAG 目录(如果不存在)
mkdir -p ~/airflow/dags

# 创建第一个 DAG 文件
touch ~/airflow/dags/hello_world_dag.py

编写 DAG 代码

# ~/airflow/dags/hello_world_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# 定义默认参数
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 定义 Python 函数
def print_hello():
    """打印 Hello World"""
    print("Hello World from Python!")
    return "Hello World"

def print_date(**context):
    """打印执行日期"""
    execution_date = context['execution_date']
    print(f"Execution date: {execution_date}")
    return f"Executed on {execution_date}"

# 创建 DAG
with DAG(
    'hello_world_dag',
    default_args=default_args,
    description='A simple Hello World DAG',
    schedule_interval=timedelta(minutes=5),  # 每5分钟执行一次
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example', 'hello-world'],
) as dag:

    # 任务1:使用 BashOperator 打印 Hello
    task_hello = BashOperator(
        task_id='print_hello_bash',
        bash_command='echo "Hello World from Bash!"',
    )

    # 任务2:使用 PythonOperator 打印 Hello
    task_hello_python = PythonOperator(
        task_id='print_hello_python',
        python_callable=print_hello,
    )

    # 任务3:打印执行日期
    task_date = PythonOperator(
        task_id='print_date',
        python_callable=print_date,
    )

    # 设置任务依赖关系
    task_hello >> task_hello_python >> task_date

验证 DAG 创建

# 检查 DAG 是否被正确加载
airflow dags list | grep hello_world_dag

# 检查 DAG 语法
airflow dags test hello_world_dag 2023-01-01

# 查看 DAG 任务列表
airflow tasks list hello_world_dag

# 查看任务依赖关系
airflow tasks list hello_world_dag --tree
3.1.2 手动触发 DAG
# 手动触发 DAG 执行
airflow dags trigger hello_world_dag

# 指定执行日期触发
airflow dags trigger hello_world_dag --conf '{"execution_date": "2023-01-01T00:00:00"}'

# 查看 DAG 运行状态
airflow dags list-runs --dag-id hello_world_dag
3.1.3 Web UI 验证
  1. 访问 Web UI: http://localhost:8080
  2. 登录: 使用 admin/admin123 账户
  3. 查看 DAG: 在 DAGs 列表中找到 hello_world_dag
  4. 手动触发: 点击 “Trigger DAG” 按钮
  5. 监控执行: 在 Graph View 中查看任务执行状态

3.2 任务依赖关系实践

3.2.1 线性依赖
# ~/airflow/dags/linear_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    'linear_dependencies_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1),
    catchup=False,
) as dag:

    # 创建多个任务
    task_a = BashOperator(
        task_id='task_a',
        bash_command='echo "Task A completed"',
    )

    task_b = BashOperator(
        task_id='task_b',
        bash_command='echo "Task B completed"',
    )

    task_c = BashOperator(
        task_id='task_c',
        bash_command='echo "Task C completed"',
    )

    task_d = BashOperator(
        task_id='task_d',
        bash_command='echo "Task D completed"',
    )

    # 设置线性依赖:A -> B -> C -> D
    task_a >> task_b >> task_c >> task_d
3.2.2 并行依赖
# ~/airflow/dags/parallel_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    'parallel_dependencies_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1),
    catchup=False,
) as dag:

    # 起始任务
    start_task = BashOperator(
        task_id='start_task',
        bash_command='echo "Starting parallel tasks"',
    )

    # 并行任务
    task_1 = BashOperator(
        task_id='task_1',
        bash_command='sleep 10 && echo "Task 1 completed"',
    )

    task_2 = BashOperator(
        task_id='task_2',
        bash_command='sleep 15 && echo "Task 2 completed"',
    )

    task_3 = BashOperator(
        task_id='task_3',
        bash_command='sleep 20 && echo "Task 3 completed"',
    )

    # 结束任务
    end_task = BashOperator(
        task_id='end_task',
        bash_command='echo "All parallel tasks completed"',
    )

    # 设置依赖:start -> [task_1, task_2, task_3] -> end
    start_task >> [task_1, task_2, task_3] >> end_task
3.2.3 复杂依赖关系
# ~/airflow/dags/complex_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    'complex_dependencies_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1),
    catchup=False,
) as dag:

    # 起始任务
    start = EmptyOperator(task_id='start')

    # 第一组并行任务
    task_a1 = BashOperator(
        task_id='task_a1',
        bash_command='echo "Task A1 completed"',
    )

    task_a2 = BashOperator(
        task_id='task_a2',
        bash_command='echo "Task A2 completed"',
    )

    # 中间任务
    middle = BashOperator(
        task_id='middle',
        bash_command='echo "Middle task completed"',
    )

    # 第二组并行任务
    task_b1 = BashOperator(
        task_id='task_b1',
        bash_command='echo "Task B1 completed"',
    )

    task_b2 = BashOperator(
        task_id='task_b2',
        bash_command='echo "Task B2 completed"',
    )

    task_b3 = BashOperator(
        task_id='task_b3',
        bash_command='echo "Task B3 completed"',
    )

    # 结束任务
    end = EmptyOperator(task_id='end')

    # 设置复杂依赖关系
    start >> [task_a1, task_a2] >> middle >> [task_b1, task_b2, task_b3] >> end

3.3 常用 Operator 实战

3.3.1 BashOperator 详解
# ~/airflow/dags/bash_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    'bash_operator_demo',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    # 基本命令执行
    basic_cmd = BashOperator(
        task_id='basic_cmd',
        bash_command='echo "Current date: $(date)"',
    )

    # 使用模板变量
    template_cmd = BashOperator(
        task_id='template_cmd',
        bash_command='echo "Execution date: {{ ds }}"',
    )

    # 执行脚本文件
    script_cmd = BashOperator(
        task_id='script_cmd',
        bash_command='''
            set -e
            echo "Creating directory..."
            mkdir -p /tmp/airflow_test
            echo "Writing file..."
            echo "Hello from Airflow" > /tmp/airflow_test/test.txt
            echo "File created successfully"
        ''',
    )

    # 使用环境变量
    env_cmd = BashOperator(
        task_id='env_cmd',
        bash_command='echo "User: $USER, Home: $HOME"',
        env={'CUSTOM_VAR': 'custom_value'},
    )

    # 设置依赖
    basic_cmd >> template_cmd >> script_cmd >> env_cmd
3.3.2 PythonOperator 详解
# ~/airflow/dags/python_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

def simple_function():
    """简单函数"""
    print("Hello from Python function!")
    return "Success"

def function_with_args(name, age):
    """带参数的函数"""
    print(f"Name: {name}, Age: {age}")
    return f"Processed {name}"

def function_with_context(**context):
    """使用 Airflow 上下文的函数"""
    execution_date = context['execution_date']
    task_instance = context['task_instance']
    dag = context['dag']
    
    print(f"Execution date: {execution_date}")
    print(f"DAG ID: {dag.dag_id}")
    print(f"Task ID: {task_instance.task_id}")
    
    return "Context processed"

def data_processing_function(**context):
    """数据处理函数"""
    # 模拟数据处理
    data = [1, 2, 3, 4, 5]
    result = sum(data)
    print(f"Data processing result: {result}")
    
    # 将结果推送到 XCom
    context['task_instance'].xcom_push(key='sum_result', value=result)
    
    return result

with DAG(
    'python_operator_demo',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    # 简单函数调用
    simple_task = PythonOperator(
        task_id='simple_task',
        python_callable=simple_function,
    )

    # 带参数的函数调用
    args_task = PythonOperator(
        task_id='args_task',
        python_callable=function_with_args,
        op_args=['Alice', 25],  # 位置参数
    )

    # 使用上下文的函数
    context_task = PythonOperator(
        task_id='context_task',
        python_callable=function_with_context,
        provide_context=True,
    )

    # 数据处理函数
    data_task = PythonOperator(
        task_id='data_task',
        python_callable=data_processing_function,
        provide_context=True,
    )

    # 设置依赖
    simple_task >> args_task >> context_task >> data_task
3.3.3 EmailOperator 使用
# ~/airflow/dags/email_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator

def generate_report():
    """生成报告内容"""
    return """
    <h2>Daily Report</h2>
    <p>This is a daily report generated by Airflow.</p>
    <ul>
        <li>Task 1: Completed</li>
        <li>Task 2: Completed</li>
        <li>Task 3: Completed</li>
    </ul>
    <p>Generated on: {{ ds }}</p>
    """

with DAG(
    'email_operator_demo',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    # 生成报告
    generate_report_task = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
    )

    # 发送邮件
    send_email_task = EmailOperator(
        task_id='send_email',
        to=['recipient@example.com'],
        subject='Daily Report - {{ ds }}',
        html_content="{{ task_instance.xcom_pull(task_ids='generate_report') }}",
    )

    # 设置依赖
    generate_report_task >> send_email_task

3.4 变量和连接实战

3.4.1 使用 Variables

设置变量

# 通过命令行设置变量
airflow variables set "data_path" "/opt/data"
airflow variables set "api_url" "https://api.example.com"
airflow variables set "email_recipients" '["user1@example.com", "user2@example.com"]'

# 查看变量
airflow variables list
airflow variables get data_path

在 DAG 中使用变量

# ~/airflow/dags/variables_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

def use_variables(**context):
    """使用 Airflow 变量"""
    # 获取变量
    data_path = Variable.get("data_path")
    api_url = Variable.get("api_url")
    email_recipients = Variable.get("email_recipients", deserialize_json=True)
    
    print(f"Data path: {data_path}")
    print(f"API URL: {api_url}")
    print(f"Email recipients: {email_recipients}")
    
    return "Variables processed"

def use_template_variables(**context):
    """使用模板变量"""
    # 在函数中访问模板变量
    execution_date = context['ds']
    dag_id = context['dag'].dag_id
    
    print(f"Execution date: {execution_date}")
    print(f"DAG ID: {dag_id}")
    
    return "Template variables processed"

with DAG(
    'variables_demo',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    # 使用变量的任务
    variables_task = PythonOperator(
        task_id='use_variables',
        python_callable=use_variables,
        provide_context=True,
    )

    # 使用模板变量的任务
    template_task = PythonOperator(
        task_id='use_template_variables',
        python_callable=use_template_variables,
        provide_context=True,
    )

    # 设置依赖
    variables_task >> template_task
3.4.2 使用 Connections

设置 MySQL 连接

# 通过命令行设置连接
airflow connections add 'mysql_default' \
    --conn-type 'mysql' \
    --conn-host 'localhost' \
    --conn-login 'root' \
    --conn-password 'password' \
    --conn-port '3306' \
    --conn-schema 'test_db'

# 查看连接
airflow connections list
airflow connections get mysql_default

在 DAG 中使用连接

# ~/airflow/dags/connections_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook

def query_mysql_data(**context):
    """查询 MySQL 数据"""
    # 使用 MySQL Hook
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    
    # 执行查询
    sql = "SELECT COUNT(*) as count FROM users"
    result = mysql_hook.get_first(sql)
    
    print(f"User count: {result[0]}")
    
    # 推送到 XCom
    context['task_instance'].xcom_push(key='user_count', value=result[0])
    
    return result[0]

with DAG(
    'connections_demo',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:

    # 使用 MySqlOperator
    mysql_task = MySqlOperator(
        task_id='mysql_query',
        mysql_conn_id='mysql_default',
        sql='SELECT * FROM users LIMIT 5',
    )

    # 使用 Python 函数查询
    python_mysql_task = PythonOperator(
        task_id='python_mysql_query',
        python_callable=query_mysql_data,
        provide_context=True,
    )

    # 设置依赖
    mysql_task >> python_mysql_task

3.5 实际项目案例

3.5.1 数据采集 DAG
# ~/airflow/dags/data_collection_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import os

def fetch_api_data(**context):
    """从 API 获取数据"""
    api_url = "https://jsonplaceholder.typicode.com/posts"
    
    try:
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()
        
        # 保存数据到文件
        output_path = "/tmp/api_data.json"
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        print(f"Data fetched successfully, saved to {output_path}")
        print(f"Total records: {len(data)}")
        
        # 推送到 XCom
        context['task_instance'].xcom_push(key='record_count', value=len(data))
        
        return len(data)
    except Exception as e:
        print(f"Error fetching data: {e}")
        raise

def process_data(**context):
    """处理数据"""
    input_path = "/tmp/api_data.json"
    
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Input file not found: {input_path}")
    
    with open(input_path, 'r') as f:
        data = json.load(f)
    
    # 简单的数据处理:提取标题
    titles = [item['title'] for item in data]
    
    # 保存处理结果
    output_path = "/tmp/processed_data.txt"
    with open(output_path, 'w') as f:
        for title in titles:
            f.write(f"{title}\n")
    
    print(f"Data processed, titles saved to {output_path}")
    return len(titles)

def cleanup_files(**context):
    """清理临时文件"""
    files_to_clean = ["/tmp/api_data.json", "/tmp/processed_data.txt"]
    
    for file_path in files_to_clean:
        if os.path.exists(file_path):
            os.remove(file_path)
            print(f"Cleaned up: {file_path}")
    
    return "Cleanup completed"

with DAG(
    'data_collection_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6),  # 每6小时执行一次
    catchup=False,
    tags=['data-collection', 'api'],
) as dag:

    # 获取数据
    fetch_task = PythonOperator(
        task_id='fetch_api_data',
        python_callable=fetch_api_data,
        provide_context=True,
    )

    # 处理数据
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
        provide_context=True,
    )

    # 清理文件
    cleanup_task = PythonOperator(
        task_id='cleanup_files',
        python_callable=cleanup_files,
        provide_context=True,
    )

    # 设置依赖
    fetch_task >> process_task >> cleanup_task
3.5.2 数据处理管道
# ~/airflow/dags/data_pipeline_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import pandas as pd
import numpy as np

def extract_data(**context):
    """提取数据"""
    # 模拟数据提取
    data = {
        'id': range(1, 101),
        'name': [f'User_{i}' for i in range(1, 101)],
        'age': np.random.randint(18, 65, 100),
        'salary': np.random.randint(30000, 100000, 100),
        'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 100)
    }
    
    df = pd.DataFrame(data)
    
    # 保存原始数据
    df.to_csv('/tmp/raw_data.csv', index=False)
    print(f"Extracted {len(df)} records")
    
    return len(df)

def transform_data(**context):
    """转换数据"""
    # 读取原始数据
    df = pd.read_csv('/tmp/raw_data.csv')
    
    # 数据清洗和转换
    # 1. 处理缺失值
    df = df.dropna()
    
    # 2. 添加计算字段
    df['salary_category'] = df['salary'].apply(
        lambda x: 'High' if x > 70000 else 'Medium' if x > 50000 else 'Low'
    )
    
    # 3. 按部门统计
    dept_stats = df.groupby('department').agg({
        'salary': ['mean', 'count'],
        'age': 'mean'
    }).round(2)
    
    # 保存转换后的数据
    df.to_csv('/tmp/transformed_data.csv', index=False)
    dept_stats.to_csv('/tmp/department_stats.csv')
    
    print(f"Transformed {len(df)} records")
    print("Department statistics:")
    print(dept_stats)
    
    return len(df)

def load_data(**context):
    """加载数据"""
    # 模拟数据加载到数据库
    df = pd.read_csv('/tmp/transformed_data.csv')
    
    # 这里可以添加实际的数据库插入逻辑
    # 例如:使用 SQLAlchemy 或其他数据库连接器
    
    print(f"Loaded {len(df)} records to database")
    
    # 生成报告
    report = f"""
    Data Pipeline Report - {context['ds']}
    ======================================
    Total records processed: {len(df)}
    Records by salary category:
    {df['salary_category'].value_counts().to_dict()}
    """
    
    with open('/tmp/pipeline_report.txt', 'w') as f:
        f.write(report)
    
    print("Pipeline report generated")
    return len(df)

with DAG(
    'data_pipeline_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),  # 每天执行一次
    catchup=False,
    tags=['data-pipeline', 'etl'],
) as dag:

    # 提取数据
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        provide_context=True,
    )

    # 转换数据
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        provide_context=True,
    )

    # 加载数据
    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
        provide_context=True,
    )

    # 设置依赖:ETL 管道
    extract_task >> transform_task >> load_task

3.6 错误处理和监控

3.6.1 任务重试机制
# ~/airflow/dags/retry_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import random

def task_with_retry(**context):
    """可能失败的任务,演示重试机制"""
    # 模拟随机失败
    if random.random() < 0.7:  # 70% 概率失败
        raise Exception("Random failure for demonstration")
    
    print("Task completed successfully!")
    return "Success"

def task_with_custom_retry(**context):
    """自定义重试逻辑的任务"""
    try:
        # 模拟业务逻辑
        result = 10 / 0  # 故意制造错误
        return result
    except ZeroDivisionError:
        print("Caught ZeroDivisionError, will retry...")
        raise

with DAG(
    'retry_demo_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1),
    catchup=False,
    default_args={
        'retries': 3,  # 最多重试3次
        'retry_delay': timedelta(minutes=1),  # 重试间隔1分钟
        'retry_exponential_backoff': True,  # 指数退避
        'max_retry_delay': timedelta(minutes=10),  # 最大重试间隔
    },
) as dag:

    # 基本重试任务
    retry_task = PythonOperator(
        task_id='task_with_retry',
        python_callable=task_with_retry,
        provide_context=True,
    )

    # 自定义重试任务
    custom_retry_task = PythonOperator(
        task_id='task_with_custom_retry',
        python_callable=task_with_custom_retry,
        provide_context=True,
        retries=5,  # 覆盖默认重试次数
        retry_delay=timedelta(seconds=30),  # 覆盖默认重试间隔
    )

    # 设置依赖
    retry_task >> custom_retry_task
3.6.2 监控和告警
# ~/airflow/dags/monitoring_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

def critical_task(**context):
    """关键任务,失败时需要告警"""
    # 模拟关键业务逻辑
    print("Executing critical business logic...")
    
    # 模拟偶尔失败
    import random
    if random.random() < 0.3:  # 30% 概率失败
        raise Exception("Critical task failed!")
    
    print("Critical task completed successfully")
    return "Success"

def generate_monitoring_report(**context):
    """生成监控报告"""
    execution_date = context['ds']
    
    report = f"""
    <h2>DAG Monitoring Report</h2>
    <p><strong>Execution Date:</strong> {execution_date}</p>
    <p><strong>DAG ID:</strong> {context['dag'].dag_id}</p>
    <p><strong>Status:</strong> Running</p>
    
    <h3>Task Summary</h3>
    <ul>
        <li>Critical Task: Completed</li>
        <li>Report Generation: Completed</li>
    </ul>
    
    <p>This is an automated report generated by Airflow.</p>
    """
    
    # 推送到 XCom
    context['task_instance'].xcom_push(key='monitoring_report', value=report)
    
    return "Report generated"

with DAG(
    'monitoring_demo_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=2),
    catchup=False,
    default_args={
        'email_on_failure': True,  # 失败时发送邮件
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    },
) as dag:

    # 关键任务
    critical_task_op = PythonOperator(
        task_id='critical_task',
        python_callable=critical_task,
        provide_context=True,
    )

    # 生成监控报告
    report_task = PythonOperator(
        task_id='generate_monitoring_report',
        python_callable=generate_monitoring_report,
        provide_context=True,
    )

    # 发送监控邮件
    email_task = EmailOperator(
        task_id='send_monitoring_email',
        to=['admin@example.com'],
        subject='DAG Monitoring Report - {{ ds }}',
        html_content="{{ task_instance.xcom_pull(task_ids='generate_monitoring_report', key='monitoring_report') }}",
    )

    # 设置依赖
    critical_task_op >> report_task >> email_task

3.7 最佳实践总结

3.7.1 DAG 开发最佳实践

1. 文件组织

  • 每个 DAG 文件只包含一个 DAG
  • 使用描述性的文件名和 DAG ID
  • 按业务模块组织 DAG 文件

2. 代码质量

  • 添加适当的注释和文档
  • 使用有意义的任务 ID
  • 避免在 DAG 文件中执行耗时操作

3. 错误处理

  • 设置合理的重试策略
  • 使用 try-catch 处理异常
  • 配置适当的告警机制

4. 性能优化

  • 避免不必要的任务依赖
  • 合理设置调度间隔
  • 使用适当的资源限制
3.7.2 测试和验证
# 测试 DAG 语法
airflow dags test <dag_id> <execution_date>

# 验证任务依赖
airflow tasks list <dag_id> --tree

# 检查 DAG 导入错误
airflow dags report

# 手动触发测试
airflow dags trigger <dag_id>
3.7.3 监控和维护

1. 定期检查

  • 监控 DAG 执行状态
  • 检查任务失败率
  • 查看执行时间趋势

2. 日志管理

  • 定期清理日志文件
  • 配置日志轮转
  • 监控日志大小

3. 性能调优

  • 优化任务依赖关系
  • 调整调度参数
  • 监控资源使用情况

通过以上实战练习,您应该能够:

  • 创建和配置各种类型的 DAG
  • 理解和使用不同的 Operator
  • 管理任务依赖关系
  • 使用变量和连接
  • 处理错误和监控任务执行
  • 开发实际的数据处理管道
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐