Airflow 入门案例教程
Apache Airflow 从零到实战的完整学习指南,涵盖环境搭建、服务配置、DAG 开发与最佳实践。适用人群: Airflow 初学者、数据工程师、运维人员学习成果: 能够独立搭建 Airflow 环境并开发实际的数据处理工作流
Airflow 入门案例教程
1. 安装 Airflow 环境
1.1安装Airflow环境
环境信息: Python 版本: 3.10.18
- 环境名称: crawl_py310 (下面所有涉及环境名称命令,改成相应的个人环境名称)
# 环境名称: crawl_py310
# 环境路径: `/Library/anaconda3/envs/crawl_py310`
# 安装命令: `constraints-3.10.txt` 使用约束文件确保所有依赖包的版本兼容性
python -m pip install "apache-airflow==2.7.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
1.2检查Airflow环境
# 检查 Airflow 版本
python -c "import airflow; print(airflow.__version__)"
# 检查安装的包
pip list | grep airflow
airflow version
1.3常见安装问题
问题1: 版本冲突
现象: 安装时出现版本冲突错误
原因: 依赖包版本不兼容
解决: 使用约束文件,或清理环境重新安装
问题2: 网络超时
现象: 下载包时网络超时
原因: 网络连接不稳定或防火墙限制
解决: 使用国内镜像源或重试安装
问题3: 权限不足
现象: 安装时提示权限错误
原因: 没有写入权限
解决: 使用 --user
参数或检查环境权限
2. Airflow 项目环境配置
2.1 环境配置流程图
flowchart TD
subgraph ENV ["1. 环境准备阶段"]
A[激活 Python 环境<br/>conda activate crawl_py310]
B[设置环境变量<br/>AIRFLOW_HOME=~/airflow]
C[创建目录结构<br/>mkdir -p ~/airflow/dags/logs/plugins]
end
subgraph DB ["2. 数据库初始化阶段"]
D[初始化数据库<br/>airflow db init]
E[验证数据库连接<br/>airflow db check]
F[创建管理员用户<br/>airflow users create]
end
subgraph SERVICE ["3. 服务启动阶段"]
G[启动 Web 服务器<br/>airflow webserver]
H[启动调度器<br/>airflow scheduler]
end
ENV --> DB
DB --> SERVICE
style ENV fill:#e1f5fe,stroke:#01579b,stroke-width:3px
style DB fill:#f3e5f5,stroke:#4a148c,stroke-width:3px
style SERVICE fill:#e8f5e8,stroke:#1b5e20,stroke-width:3px
style A fill:#ffffff,stroke:#01579b,stroke-width:1px
style B fill:#ffffff,stroke:#01579b,stroke-width:1px
style C fill:#ffffff,stroke:#01579b,stroke-width:1px
style D fill:#ffffff,stroke:#4a148c,stroke-width:1px
style E fill:#ffffff,stroke:#4a148c,stroke-width:1px
style F fill:#ffffff,stroke:#4a148c,stroke-width:1px
style G fill:#ffffff,stroke:#1b5e20,stroke-width:1px
style H fill:#ffffff,stroke:#1b5e20,stroke-width:1px
流程说明:
-
一次性设置阶段(蓝色节点):
- 激活 Python 环境:确保使用正确的 Python 版本和依赖包
- 设置环境变量:定义 Airflow 工作目录和配置
- 创建目录结构:建立必要的文件夹结构
-
数据库初始化阶段(紫色节点):
- 初始化数据库:创建 SQLite 数据库和表结构
- 验证数据库连接:确认数据库可以正常访问
- 创建管理员用户:设置登录账户
-
服务启动阶段(绿色节点):
- 启动 Web 服务器:提供 Web UI 界面
- 启动调度器:执行任务调度和监控
重要说明:
- 前6个步骤为一次性设置,完成后无需重复
- 最后2个步骤为每次启动 Airflow 时需要执行
- 所有步骤必须按顺序执行,确保依赖关系正确
2.2 环境准备阶段
激活 Python 环境
# 切换到 Airflow 环境
conda activate crawl_py310
# 验证环境
which python
python --version
设置环境变量
# 设置 Airflow 主目录
export AIRFLOW_HOME=~/airflow
# 设置时区(可选,默认为 UTC)
export AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai
# 验证环境变量
echo $AIRFLOW_HOME
创建目录结构
# 创建 Airflow 必要的目录结构
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins
# 验证目录创建
ls -la ~/airflow/
2.3数据库初始化阶段
2.3.1 数据库初始化
# 切换到 Airflow 环境
conda activate crawl_py310
# 检查数据库连接状态
airflow db check
# 初始化 Airflow 元数据库(默认使用 SQLite)
airflow db init
# 验证数据库初始化
airflow db check
# 数据库重新初始化
airflow dbt reset
# 重新初始化数据库(删除所有数据并重新创建)
airflow db reset --yes
关闭数据库服务
本项目是采用 SQLite 数据库,不需要单独的服务进程,关闭 Airflow 服务时,SQLite 数据库连接会自动关闭。
2.3.2 数据库连接排障
如果 airflow db check
返回错误,可能的原因和解决方案:
错误1: 数据库文件不存在
# 错误信息
[ERROR] Connection failed.
[ERROR] No such file or directory: '/path/to/airflow.db'
# 解决方案
# 1. 检查 AIRFLOW_HOME 环境变量
echo $AIRFLOW_HOME
# 2. 创建 Airflow 目录
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins
# 3. 设置环境变量
export AIRFLOW_HOME=~/airflow
# 4. 初始化数据库
airflow db init
错误2: 数据库未初始化
# 错误信息
[ERROR] Connection failed.
[ERROR] no such table: ab_permission
# 解决方案
# 直接初始化数据库
airflow db init
错误3: 权限问题
# 错误信息
[ERROR] Connection failed.
[ERROR] permission denied: '/path/to/airflow.db'
# 解决方案
# 1. 检查文件权限
ls -la ~/airflow/airflow.db
# 2. 修改权限
chmod 644 ~/airflow/airflow.db
# 3. 或者重新创建数据库
rm ~/airflow/airflow.db
airflow db init
错误4: 配置错误
# 错误信息
[ERROR] Connection failed.
[ERROR] Invalid database URL format
# 解决方案
# 1. 检查数据库配置
airflow config get-value database sql_alchemy_conn
# 2. 重置为默认配置
unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
export AIRFLOW_HOME=~/airflow
# 3. 重新初始化
airflow db init
使用排障脚本:
# 运行自动排障脚本
./troubleshoot_db.sh
# 脚本会自动检查并修复常见问题
2.3.3 创建和管理用户
查看用户信息
# 列出所有用户
airflow users list
# 查看用户详细信息(通过用户名)
airflow users list | grep <username>
# 查看用户详细信息(通过邮箱)
airflow users list | grep <email>
管理用户
角色 | 权限范围 | 适用场景 |
---|---|---|
Public | 仅能访问登录页面,无实际操作权限 | 临时访客(基本无用) |
Viewer | 查看 DAG、任务日志、任务状态,但不能修改或触发 | 监控人员/只读审计 |
User | 查看 + 编辑 DAG 代码(可上传/修改),但不能触发任务或修改变量 | 开发人员(编写DAG) |
Op | 查看 + 触发任务、清除任务状态、标记成功/失败,但不能修改 DAG 代码 | 运维人员(日常操作) |
Admin | 完全控制(包括用户管理、变量/连接编辑、所有 DAG 操作) | 系统管理员 |
# 创建管理员用户
airflow users create --username admin --password admin123 --role Admin --email xxx@xxx.com --firstname xx --lastname xx
# 创建普通用户, 邮箱冲突:Airflow 不允许重复的邮箱地址,使用不同邮箱解决
airflow users create --username xx --password xxx123 --firstname xx --lastname he --role Public --email xxxx@xxx.com
# 授权
airflow users add-role --username xx --role User
airflow users add-role --username xx --role Viewer
# 验证用户创建
airflow users list
删除用户
# 通过用户名删除用户
airflow users delete --username test_user
# 通过邮箱删除用户
airflow users delete --email <email>
更改用户
# 为用户添加角色
airflow users add-role --username <username> --role <role>
# 移除用户角色
airflow users remove-role --username <username> --role <role>
2.4 服务启动阶段
2.4.1 查看 Airflow 服务
查看Airflow进程
# 查看所有 Airflow 相关进程
ps aux | grep airflow
# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080
# 查看特定 Airflow 进程(更精确的过滤)
ps aux | grep -E "(airflow webserver|airflow scheduler|airflow worker)"
# 查看进程树(显示父子关系)
pstree -p | grep airflow
# 查看进程详细信息
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime
查看端口占用情况
# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080
# 查看所有监听端口
lsof -i -P -n | grep LISTEN
# 查看所有 Airflow 相关端口
lsof -i | grep airflow
# 使用 netstat 查看端口
netstat -an | grep :8080
# 使用 ss 命令查看端口(更现代的方式)
ss -tuln | grep :8080
查看 Airflow 服务状态
# 检查 Airflow 数据库连接状态
airflow db check
# 检查 Airflow 配置
airflow config list
# 检查 DAG 列表
airflow dags list
# 检查任务状态
airflow tasks list
2.4.2 启动 Airflow 服务
启动 Web 服务器
# 前台启动 Web 服务器(推荐用于调试)
airflow webserver --port 8080
# 后台启动 Web 服务器(推荐用于生产环境)
nohup airflow webserver --port 8080 > ~/Downloads/airflow_webserver.log 2>&1 &
# 使用其他端口启动(如果 8080 被占用)
airflow webserver --port 8081
# 指定主机地址启动
airflow webserver --host 0.0.0.0 --port 8080
# 检查airflow web是否启动
ps aux | grep airflow-webserver
ps aux | grep airflow
# 检查端口
lsof -i :8080
# 检查 Airflow Web ip
curl -I http://localhost:8080
启动调度器
调度器(Scheduler) 类似 linux crontab,没有 crontab 则不能自动调度任务,但仍可以手动调度
# 前台启动调度器(推荐用于调试)
airflow scheduler
# 后台启动调度器(推荐用于生产环境)
nohup airflow scheduler > ~/Downloads/airflow_scheduler.log 2>&1 &
# 指定配置文件启动
airflow scheduler --config /path/to/airflow.cfg
# 启动调度器并指定日志级别
airflow scheduler --log-level DEBUG
# 检查scheduler启动情况
ps aux | grep -E "(airflow scheduler|airflow worker)"
服务启动验证
# 检查进程是否启动
ps aux | grep airflow
# 检查端口是否监听
lsof -i :8080
# 检查日志文件
tail -f airflow_webserver.log
tail -f airflow_scheduler.log
# 检查服务健康状态
curl -I http://localhost:8080
2.4.3 验证服务功能
Web UI 访问验证
# 检查 Web 服务器是否可访问
curl -I http://localhost:8080
# 检查 API 端点
curl http://localhost:8080/api/v1/health
# 检查版本信息
curl http://localhost:8080/api/v1/version
登录验证
- Web UI 地址: http://localhost:8080
- 管理员账户:
- 用户名: admin
- 密码: admin123
功能验证命令
# 检查 DAG 列表
airflow dags list
# 检查任务列表
airflow tasks list
# 检查连接配置
airflow connections list
# 检查变量配置
airflow variables list
# 检查池配置
airflow pools list
# 检查插件
airflow plugins
服务状态监控
# 查看调度器状态
airflow jobs check
# 查看任务实例
airflow tasks list <dag_id>
# 查看 DAG 运行历史
airflow dags list-runs
# 查看任务运行历史
airflow tasks list-runs <dag_id> <task_id>
日志查看
# 查看 Web 服务器日志
tail -f ~/airflow/logs/webserver/webserver.log
# 查看调度器日志
tail -f ~/airflow/logs/scheduler/latest/*.log
# 查看 DAG 日志
airflow tasks logs <dag_id> <task_id> <execution_date>
# 查看最新日志
find ~/airflow/logs -name "*.log" -type f -exec ls -lt {} + | head -10
性能检查
# 检查数据库连接
airflow db check
# 检查配置
airflow config list
# 检查版本信息
airflow version
# 检查环境信息
airflow info
常见问题排查
# 如果 Web UI 无法访问
curl -v http://localhost:8080
# 如果调度器不工作
airflow scheduler --dry-run
# 检查端口占用
lsof -i :8080
# 检查进程状态
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime
3. DAG 开发与实践
3.1 第一个实战 DAG
3.1.1 创建 Hello World DAG
创建 DAG 文件
# 创建 DAG 目录(如果不存在)
mkdir -p ~/airflow/dags
# 创建第一个 DAG 文件
touch ~/airflow/dags/hello_world_dag.py
编写 DAG 代码
# ~/airflow/dags/hello_world_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# 定义默认参数
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 定义 Python 函数
def print_hello():
"""打印 Hello World"""
print("Hello World from Python!")
return "Hello World"
def print_date(**context):
"""打印执行日期"""
execution_date = context['execution_date']
print(f"Execution date: {execution_date}")
return f"Executed on {execution_date}"
# 创建 DAG
with DAG(
'hello_world_dag',
default_args=default_args,
description='A simple Hello World DAG',
schedule_interval=timedelta(minutes=5), # 每5分钟执行一次
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example', 'hello-world'],
) as dag:
# 任务1:使用 BashOperator 打印 Hello
task_hello = BashOperator(
task_id='print_hello_bash',
bash_command='echo "Hello World from Bash!"',
)
# 任务2:使用 PythonOperator 打印 Hello
task_hello_python = PythonOperator(
task_id='print_hello_python',
python_callable=print_hello,
)
# 任务3:打印执行日期
task_date = PythonOperator(
task_id='print_date',
python_callable=print_date,
)
# 设置任务依赖关系
task_hello >> task_hello_python >> task_date
验证 DAG 创建
# 检查 DAG 是否被正确加载
airflow dags list | grep hello_world_dag
# 检查 DAG 语法
airflow dags test hello_world_dag 2023-01-01
# 查看 DAG 任务列表
airflow tasks list hello_world_dag
# 查看任务依赖关系
airflow tasks list hello_world_dag --tree
3.1.2 手动触发 DAG
# 手动触发 DAG 执行
airflow dags trigger hello_world_dag
# 指定执行日期触发
airflow dags trigger hello_world_dag --conf '{"execution_date": "2023-01-01T00:00:00"}'
# 查看 DAG 运行状态
airflow dags list-runs --dag-id hello_world_dag
3.1.3 Web UI 验证
- 访问 Web UI: http://localhost:8080
- 登录: 使用 admin/admin123 账户
- 查看 DAG: 在 DAGs 列表中找到
hello_world_dag
- 手动触发: 点击 “Trigger DAG” 按钮
- 监控执行: 在 Graph View 中查看任务执行状态
3.2 任务依赖关系实践
3.2.1 线性依赖
# ~/airflow/dags/linear_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'linear_dependencies_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1),
catchup=False,
) as dag:
# 创建多个任务
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Task A completed"',
)
task_b = BashOperator(
task_id='task_b',
bash_command='echo "Task B completed"',
)
task_c = BashOperator(
task_id='task_c',
bash_command='echo "Task C completed"',
)
task_d = BashOperator(
task_id='task_d',
bash_command='echo "Task D completed"',
)
# 设置线性依赖:A -> B -> C -> D
task_a >> task_b >> task_c >> task_d
3.2.2 并行依赖
# ~/airflow/dags/parallel_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'parallel_dependencies_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1),
catchup=False,
) as dag:
# 起始任务
start_task = BashOperator(
task_id='start_task',
bash_command='echo "Starting parallel tasks"',
)
# 并行任务
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 10 && echo "Task 1 completed"',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 15 && echo "Task 2 completed"',
)
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 20 && echo "Task 3 completed"',
)
# 结束任务
end_task = BashOperator(
task_id='end_task',
bash_command='echo "All parallel tasks completed"',
)
# 设置依赖:start -> [task_1, task_2, task_3] -> end
start_task >> [task_1, task_2, task_3] >> end_task
3.2.3 复杂依赖关系
# ~/airflow/dags/complex_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
'complex_dependencies_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1),
catchup=False,
) as dag:
# 起始任务
start = EmptyOperator(task_id='start')
# 第一组并行任务
task_a1 = BashOperator(
task_id='task_a1',
bash_command='echo "Task A1 completed"',
)
task_a2 = BashOperator(
task_id='task_a2',
bash_command='echo "Task A2 completed"',
)
# 中间任务
middle = BashOperator(
task_id='middle',
bash_command='echo "Middle task completed"',
)
# 第二组并行任务
task_b1 = BashOperator(
task_id='task_b1',
bash_command='echo "Task B1 completed"',
)
task_b2 = BashOperator(
task_id='task_b2',
bash_command='echo "Task B2 completed"',
)
task_b3 = BashOperator(
task_id='task_b3',
bash_command='echo "Task B3 completed"',
)
# 结束任务
end = EmptyOperator(task_id='end')
# 设置复杂依赖关系
start >> [task_a1, task_a2] >> middle >> [task_b1, task_b2, task_b3] >> end
3.3 常用 Operator 实战
3.3.1 BashOperator 详解
# ~/airflow/dags/bash_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'bash_operator_demo',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# 基本命令执行
basic_cmd = BashOperator(
task_id='basic_cmd',
bash_command='echo "Current date: $(date)"',
)
# 使用模板变量
template_cmd = BashOperator(
task_id='template_cmd',
bash_command='echo "Execution date: {{ ds }}"',
)
# 执行脚本文件
script_cmd = BashOperator(
task_id='script_cmd',
bash_command='''
set -e
echo "Creating directory..."
mkdir -p /tmp/airflow_test
echo "Writing file..."
echo "Hello from Airflow" > /tmp/airflow_test/test.txt
echo "File created successfully"
''',
)
# 使用环境变量
env_cmd = BashOperator(
task_id='env_cmd',
bash_command='echo "User: $USER, Home: $HOME"',
env={'CUSTOM_VAR': 'custom_value'},
)
# 设置依赖
basic_cmd >> template_cmd >> script_cmd >> env_cmd
3.3.2 PythonOperator 详解
# ~/airflow/dags/python_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def simple_function():
"""简单函数"""
print("Hello from Python function!")
return "Success"
def function_with_args(name, age):
"""带参数的函数"""
print(f"Name: {name}, Age: {age}")
return f"Processed {name}"
def function_with_context(**context):
"""使用 Airflow 上下文的函数"""
execution_date = context['execution_date']
task_instance = context['task_instance']
dag = context['dag']
print(f"Execution date: {execution_date}")
print(f"DAG ID: {dag.dag_id}")
print(f"Task ID: {task_instance.task_id}")
return "Context processed"
def data_processing_function(**context):
"""数据处理函数"""
# 模拟数据处理
data = [1, 2, 3, 4, 5]
result = sum(data)
print(f"Data processing result: {result}")
# 将结果推送到 XCom
context['task_instance'].xcom_push(key='sum_result', value=result)
return result
with DAG(
'python_operator_demo',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# 简单函数调用
simple_task = PythonOperator(
task_id='simple_task',
python_callable=simple_function,
)
# 带参数的函数调用
args_task = PythonOperator(
task_id='args_task',
python_callable=function_with_args,
op_args=['Alice', 25], # 位置参数
)
# 使用上下文的函数
context_task = PythonOperator(
task_id='context_task',
python_callable=function_with_context,
provide_context=True,
)
# 数据处理函数
data_task = PythonOperator(
task_id='data_task',
python_callable=data_processing_function,
provide_context=True,
)
# 设置依赖
simple_task >> args_task >> context_task >> data_task
3.3.3 EmailOperator 使用
# ~/airflow/dags/email_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator
def generate_report():
"""生成报告内容"""
return """
<h2>Daily Report</h2>
<p>This is a daily report generated by Airflow.</p>
<ul>
<li>Task 1: Completed</li>
<li>Task 2: Completed</li>
<li>Task 3: Completed</li>
</ul>
<p>Generated on: {{ ds }}</p>
"""
with DAG(
'email_operator_demo',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# 生成报告
generate_report_task = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
)
# 发送邮件
send_email_task = EmailOperator(
task_id='send_email',
to=['recipient@example.com'],
subject='Daily Report - {{ ds }}',
html_content="{{ task_instance.xcom_pull(task_ids='generate_report') }}",
)
# 设置依赖
generate_report_task >> send_email_task
3.4 变量和连接实战
3.4.1 使用 Variables
设置变量
# 通过命令行设置变量
airflow variables set "data_path" "/opt/data"
airflow variables set "api_url" "https://api.example.com"
airflow variables set "email_recipients" '["user1@example.com", "user2@example.com"]'
# 查看变量
airflow variables list
airflow variables get data_path
在 DAG 中使用变量
# ~/airflow/dags/variables_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
def use_variables(**context):
"""使用 Airflow 变量"""
# 获取变量
data_path = Variable.get("data_path")
api_url = Variable.get("api_url")
email_recipients = Variable.get("email_recipients", deserialize_json=True)
print(f"Data path: {data_path}")
print(f"API URL: {api_url}")
print(f"Email recipients: {email_recipients}")
return "Variables processed"
def use_template_variables(**context):
"""使用模板变量"""
# 在函数中访问模板变量
execution_date = context['ds']
dag_id = context['dag'].dag_id
print(f"Execution date: {execution_date}")
print(f"DAG ID: {dag_id}")
return "Template variables processed"
with DAG(
'variables_demo',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# 使用变量的任务
variables_task = PythonOperator(
task_id='use_variables',
python_callable=use_variables,
provide_context=True,
)
# 使用模板变量的任务
template_task = PythonOperator(
task_id='use_template_variables',
python_callable=use_template_variables,
provide_context=True,
)
# 设置依赖
variables_task >> template_task
3.4.2 使用 Connections
设置 MySQL 连接
# 通过命令行设置连接
airflow connections add 'mysql_default' \
--conn-type 'mysql' \
--conn-host 'localhost' \
--conn-login 'root' \
--conn-password 'password' \
--conn-port '3306' \
--conn-schema 'test_db'
# 查看连接
airflow connections list
airflow connections get mysql_default
在 DAG 中使用连接
# ~/airflow/dags/connections_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook
def query_mysql_data(**context):
"""查询 MySQL 数据"""
# 使用 MySQL Hook
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
# 执行查询
sql = "SELECT COUNT(*) as count FROM users"
result = mysql_hook.get_first(sql)
print(f"User count: {result[0]}")
# 推送到 XCom
context['task_instance'].xcom_push(key='user_count', value=result[0])
return result[0]
with DAG(
'connections_demo',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# 使用 MySqlOperator
mysql_task = MySqlOperator(
task_id='mysql_query',
mysql_conn_id='mysql_default',
sql='SELECT * FROM users LIMIT 5',
)
# 使用 Python 函数查询
python_mysql_task = PythonOperator(
task_id='python_mysql_query',
python_callable=query_mysql_data,
provide_context=True,
)
# 设置依赖
mysql_task >> python_mysql_task
3.5 实际项目案例
3.5.1 数据采集 DAG
# ~/airflow/dags/data_collection_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import os
def fetch_api_data(**context):
"""从 API 获取数据"""
api_url = "https://jsonplaceholder.typicode.com/posts"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
# 保存数据到文件
output_path = "/tmp/api_data.json"
with open(output_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"Data fetched successfully, saved to {output_path}")
print(f"Total records: {len(data)}")
# 推送到 XCom
context['task_instance'].xcom_push(key='record_count', value=len(data))
return len(data)
except Exception as e:
print(f"Error fetching data: {e}")
raise
def process_data(**context):
"""处理数据"""
input_path = "/tmp/api_data.json"
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input file not found: {input_path}")
with open(input_path, 'r') as f:
data = json.load(f)
# 简单的数据处理:提取标题
titles = [item['title'] for item in data]
# 保存处理结果
output_path = "/tmp/processed_data.txt"
with open(output_path, 'w') as f:
for title in titles:
f.write(f"{title}\n")
print(f"Data processed, titles saved to {output_path}")
return len(titles)
def cleanup_files(**context):
"""清理临时文件"""
files_to_clean = ["/tmp/api_data.json", "/tmp/processed_data.txt"]
for file_path in files_to_clean:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Cleaned up: {file_path}")
return "Cleanup completed"
with DAG(
'data_collection_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6), # 每6小时执行一次
catchup=False,
tags=['data-collection', 'api'],
) as dag:
# 获取数据
fetch_task = PythonOperator(
task_id='fetch_api_data',
python_callable=fetch_api_data,
provide_context=True,
)
# 处理数据
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True,
)
# 清理文件
cleanup_task = PythonOperator(
task_id='cleanup_files',
python_callable=cleanup_files,
provide_context=True,
)
# 设置依赖
fetch_task >> process_task >> cleanup_task
3.5.2 数据处理管道
# ~/airflow/dags/data_pipeline_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import pandas as pd
import numpy as np
def extract_data(**context):
"""提取数据"""
# 模拟数据提取
data = {
'id': range(1, 101),
'name': [f'User_{i}' for i in range(1, 101)],
'age': np.random.randint(18, 65, 100),
'salary': np.random.randint(30000, 100000, 100),
'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 100)
}
df = pd.DataFrame(data)
# 保存原始数据
df.to_csv('/tmp/raw_data.csv', index=False)
print(f"Extracted {len(df)} records")
return len(df)
def transform_data(**context):
"""转换数据"""
# 读取原始数据
df = pd.read_csv('/tmp/raw_data.csv')
# 数据清洗和转换
# 1. 处理缺失值
df = df.dropna()
# 2. 添加计算字段
df['salary_category'] = df['salary'].apply(
lambda x: 'High' if x > 70000 else 'Medium' if x > 50000 else 'Low'
)
# 3. 按部门统计
dept_stats = df.groupby('department').agg({
'salary': ['mean', 'count'],
'age': 'mean'
}).round(2)
# 保存转换后的数据
df.to_csv('/tmp/transformed_data.csv', index=False)
dept_stats.to_csv('/tmp/department_stats.csv')
print(f"Transformed {len(df)} records")
print("Department statistics:")
print(dept_stats)
return len(df)
def load_data(**context):
"""加载数据"""
# 模拟数据加载到数据库
df = pd.read_csv('/tmp/transformed_data.csv')
# 这里可以添加实际的数据库插入逻辑
# 例如:使用 SQLAlchemy 或其他数据库连接器
print(f"Loaded {len(df)} records to database")
# 生成报告
report = f"""
Data Pipeline Report - {context['ds']}
======================================
Total records processed: {len(df)}
Records by salary category:
{df['salary_category'].value_counts().to_dict()}
"""
with open('/tmp/pipeline_report.txt', 'w') as f:
f.write(report)
print("Pipeline report generated")
return len(df)
with DAG(
'data_pipeline_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1), # 每天执行一次
catchup=False,
tags=['data-pipeline', 'etl'],
) as dag:
# 提取数据
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
provide_context=True,
)
# 转换数据
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True,
)
# 加载数据
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
provide_context=True,
)
# 设置依赖:ETL 管道
extract_task >> transform_task >> load_task
3.6 错误处理和监控
3.6.1 任务重试机制
# ~/airflow/dags/retry_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import random
def task_with_retry(**context):
"""可能失败的任务,演示重试机制"""
# 模拟随机失败
if random.random() < 0.7: # 70% 概率失败
raise Exception("Random failure for demonstration")
print("Task completed successfully!")
return "Success"
def task_with_custom_retry(**context):
"""自定义重试逻辑的任务"""
try:
# 模拟业务逻辑
result = 10 / 0 # 故意制造错误
return result
except ZeroDivisionError:
print("Caught ZeroDivisionError, will retry...")
raise
with DAG(
'retry_demo_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1),
catchup=False,
default_args={
'retries': 3, # 最多重试3次
'retry_delay': timedelta(minutes=1), # 重试间隔1分钟
'retry_exponential_backoff': True, # 指数退避
'max_retry_delay': timedelta(minutes=10), # 最大重试间隔
},
) as dag:
# 基本重试任务
retry_task = PythonOperator(
task_id='task_with_retry',
python_callable=task_with_retry,
provide_context=True,
)
# 自定义重试任务
custom_retry_task = PythonOperator(
task_id='task_with_custom_retry',
python_callable=task_with_custom_retry,
provide_context=True,
retries=5, # 覆盖默认重试次数
retry_delay=timedelta(seconds=30), # 覆盖默认重试间隔
)
# 设置依赖
retry_task >> custom_retry_task
3.6.2 监控和告警
# ~/airflow/dags/monitoring_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
def critical_task(**context):
"""关键任务,失败时需要告警"""
# 模拟关键业务逻辑
print("Executing critical business logic...")
# 模拟偶尔失败
import random
if random.random() < 0.3: # 30% 概率失败
raise Exception("Critical task failed!")
print("Critical task completed successfully")
return "Success"
def generate_monitoring_report(**context):
"""生成监控报告"""
execution_date = context['ds']
report = f"""
<h2>DAG Monitoring Report</h2>
<p><strong>Execution Date:</strong> {execution_date}</p>
<p><strong>DAG ID:</strong> {context['dag'].dag_id}</p>
<p><strong>Status:</strong> Running</p>
<h3>Task Summary</h3>
<ul>
<li>Critical Task: Completed</li>
<li>Report Generation: Completed</li>
</ul>
<p>This is an automated report generated by Airflow.</p>
"""
# 推送到 XCom
context['task_instance'].xcom_push(key='monitoring_report', value=report)
return "Report generated"
with DAG(
'monitoring_demo_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=2),
catchup=False,
default_args={
'email_on_failure': True, # 失败时发送邮件
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
) as dag:
# 关键任务
critical_task_op = PythonOperator(
task_id='critical_task',
python_callable=critical_task,
provide_context=True,
)
# 生成监控报告
report_task = PythonOperator(
task_id='generate_monitoring_report',
python_callable=generate_monitoring_report,
provide_context=True,
)
# 发送监控邮件
email_task = EmailOperator(
task_id='send_monitoring_email',
to=['admin@example.com'],
subject='DAG Monitoring Report - {{ ds }}',
html_content="{{ task_instance.xcom_pull(task_ids='generate_monitoring_report', key='monitoring_report') }}",
)
# 设置依赖
critical_task_op >> report_task >> email_task
3.7 最佳实践总结
3.7.1 DAG 开发最佳实践
1. 文件组织
- 每个 DAG 文件只包含一个 DAG
- 使用描述性的文件名和 DAG ID
- 按业务模块组织 DAG 文件
2. 代码质量
- 添加适当的注释和文档
- 使用有意义的任务 ID
- 避免在 DAG 文件中执行耗时操作
3. 错误处理
- 设置合理的重试策略
- 使用 try-catch 处理异常
- 配置适当的告警机制
4. 性能优化
- 避免不必要的任务依赖
- 合理设置调度间隔
- 使用适当的资源限制
3.7.2 测试和验证
# 测试 DAG 语法
airflow dags test <dag_id> <execution_date>
# 验证任务依赖
airflow tasks list <dag_id> --tree
# 检查 DAG 导入错误
airflow dags report
# 手动触发测试
airflow dags trigger <dag_id>
3.7.3 监控和维护
1. 定期检查
- 监控 DAG 执行状态
- 检查任务失败率
- 查看执行时间趋势
2. 日志管理
- 定期清理日志文件
- 配置日志轮转
- 监控日志大小
3. 性能调优
- 优化任务依赖关系
- 调整调度参数
- 监控资源使用情况
通过以上实战练习,您应该能够:
- 创建和配置各种类型的 DAG
- 理解和使用不同的 Operator
- 管理任务依赖关系
- 使用变量和连接
- 处理错误和监控任务执行
- 开发实际的数据处理管道
更多推荐
所有评论(0)