Airflow 工作流管理:自动化你的数据ETL管道
摘要 本文介绍了如何使用Apache Airflow构建自动化ETL数据管道。Airflow作为工作流管理工具,超越了传统cron的局限,通过有向无环图(DAG)实现任务编排和依赖管理。文章首先解释了Airflow的核心概念(DAG、Operator、Task等),然后详细展示了环境搭建步骤。重点通过6个由浅入深的实战案例(包括文件处理、网络爬虫、数据库ETL等)演示Airflow的实际应用。这些
Airflow 工作流管理:自动化你的数据ETL管道

导语
如果说数据是21世纪的石油,那么ETL(Extract, Transform, Load)管道就是从原油中提炼高纯度汽油的炼油厂。它的效率、稳定性和智能程度,直接决定了我们能否从海量数据中挖掘出真正的价值。然而,面对日益复杂的业务需求,手动的、碎片化的ETL任务管理方式,就像一个布满了手摇泵和临时管道的作坊,混乱且脆弱。
这时候,我们需要一位“总调度师”,一个能够将所有数据处理环节串联起来,实现自动化、可监控、可扩展的指挥中心。Apache Airflow,正是为此而生的开源王者。它不再是科幻小说里的遥远幻想,而是已经悄然融入我们日常数据工程的“隐形伙伴”,你感受到了吗?
本文将带你从青铜到王者,深入探索如何驾驭Airflow这匹“数据骏马”,构建从简单到复杂的各类自动化数据ETL管道。
1. Airflow:不仅仅是定时任务(Cron)的替代品
初识Airflow,很多人会觉得它像是一个加强版的cron。但这种看法,好比认为火箭只是一个飞得更快的风筝。Airflow的核心魅力在于其**工作流(Workflow)**的理念。
cron的局限:它只能孤立地、在固定时间执行任务。任务间的依赖关系、失败重试、状态监控、参数传递等复杂逻辑,cron都无能为力。- Airflow的超越:Airflow允许你用Python代码将一系列任务(Tasks)定义为一个有向无环图(DAG)。这不仅仅是“执行”,更是“编排”。它知道哪个任务应该先跑,哪个任务依赖前者的输出,任务失败了该怎么办,整个流程的状态如何。
2. 核心概念:理解Airflow的骨架与血肉
要驾驭Airflow,必须先理解它的几个核心组件,这就像学习一门语言的语法。
- DAG (Directed Acyclic Graph):工作流的蓝图。它是一个Python文件,定义了任务的集合、任务之间错综复杂的依赖关系,以及它们必须遵循的执行顺序。
- Operator:执行工作的工人。它是DAG中的一个任务节点,封装了实际要执行的特定工作。Airflow内置了海量Operator,并且允许你自定义。
BashOperator: 执行Shell命令。PythonOperator: 执行Python函数。PostgresOperator,MySqlOperator: 操作数据库。SparkSubmitOperator,GCSToBigQueryOperator: 与大数据和云服务无缝对接。
- Task:工人的具体工作。它是Operator在DAG中的一个实例化对象。
- Task Instance:工人的一次“上班记录”。它是Task在某次具体DAG运行中的实例,拥有自己的状态(如
running,success,failed)。 - XCom (Cross-communication):任务间的“悄悄话”。一种允许任务之间传递少量元数据(如文件名、小段JSON)的机制。
3. 环境搭建:三步启动你的“数据炼油厂”
在深入实践之前,我们得先把Airflow的“引擎”启动起来。本文假设您已经拥有一个基本的Python环境。
# 1. 安装Airflow
pip install apache-airflow
# 2. 初始化环境(数据库和配置文件)
airflow db init
airflow users create --username admin --firstname Your --lastname Name --role Admin --email your.email@example.com
# 3. 启动调度器和Web服务
# 启动调度器(后台运行)
airflow scheduler &
# 启动Web服务器(前台运行)
airflow webserver --port 8080
访问 http://localhost:8080,用你刚刚创建的账户登录,一个可视化的工作流管理中心就展现在你眼前。
4. 案例实战:从简单到复杂的ETL管道构建
理论知识再多,不如亲手实践一次!下面,我们将通过6个从易到难的真实案例,带你领略Airflow的强大魅力。
案例一:初识门径 - 简单的文件处理流程
这是最基础的“Hello World”级别案例,帮助我们理解DAG的基本结构。
目标:创建一个DAG,它包含两个任务:一个任务创建文件并写入内容,另一个任务读取该文件内容并打印。
# dags/simple_file_pipeline.py
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="simple_file_pipeline",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
tags=["example", "basic"],
)
def simple_file_pipeline():
@task
def create_file():
"""创建一个临时文件并写入数据。"""
with open("/tmp/my_test_file.txt", "w") as f:
f.write("Hello from Airflow!")
return "/tmp/my_test_file.txt"
@task
def read_file(file_path: str):
"""读取指定文件的内容。"""
with open(file_path, "r") as f:
content = f.read()
print(f"File content: '{content}'")
file_path = create_file()
read_file(file_path)
simple_file_pipeline()
✅ 核心解读:
- 使用了
@dag和@task装饰器(TaskFlow API),这是现代Airflow推荐的写法,更简洁。 create_file任务的返回值(文件路径)通过XCom被自动传递给了read_file任务。- 任务间的依赖关系由函数调用自动推断:
read_file(file_path)表明它依赖create_file的完成。
案例二:数据提取 - 自动化网络爬虫管道
目标:每天定时从arXiv.org抓取最新的AI论文信息,进行简单处理,并存入SQLite数据库。
# dags/arxiv_scraping_pipeline.py
from airflow.decorators import dag, task
import pendulum
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import os
@dag(
dag_id="arxiv_scraping_pipeline",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule_interval=timedelta(days=1),
catchup=False,
tags=['web_scraping', 'example'],
)
def arxiv_scraping_pipeline():
DB_PATH = os.path.join(os.environ.get("AIRFLOW_HOME", "/tmp"), "arxiv_papers.db")
@task
def scrape_arxiv_papers():
"""从arXiv.org抓取研究论文的详细信息。"""
url = "https://arxiv.org/search/?query=artificial+intelligence&searchtype=all&abstracts=show&order=-announced_date_first&size=50"
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
papers = []
for entry in soup.find_all('li', class_='arxiv-result'):
title = entry.find('p', class_='title is-5 mathjax').text.strip()
authors = [a.text.strip() for a in entry.find('p', class_='authors').find_all('a')]
papers.append({'title': title, 'authors': ', '.join(authors)})
return papers
@task
def load_to_database(papers: list):
"""将处理后的数据加载到SQLite数据库。"""
if not papers:
print("No papers to load.")
return
df = pd.DataFrame(papers)
conn = sqlite3.connect(DB_PATH)
df.to_sql('arxiv_papers', conn, if_exists='replace', index=False)
conn.close()
print(f"Loaded {len(df)} papers into {DB_PATH}")
scraped_papers = scrape_arxiv_papers()
load_to_database(scraped_papers)
arxiv_scraping_pipeline()
✅ 核心解读:
- 这是一个典型的ETL流程:
scrape_arxiv_papers(Extract),load_to_database(Load)。Transform步骤可以按需加入。 - 依赖
requests和beautifulsoup4库,需要在Airflow环境中安装 (pip install requests beautifulsoup4 pandas)。 - 完美展示了Airflow如何自动化一个典型的数据获取任务。
案例三:云上作战 - 将数据加载到Google BigQuery
目标:构建一个将本地CSV暂存到Google Cloud Storage (GCS),然后加载到BigQuery数据仓库的ETL管道。
# dags/gcs_to_bigquery_pipeline.py
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
# --- 请替换为您的GCP信息 ---
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-gcp-project-id")
GCS_BUCKET = os.environ.get("GCS_BUCKET", "your-gcs-bucket-name")
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", "your_bq_dataset")
# ------------------------------
LOCAL_FILE_PATH = "/tmp/sample_data.csv"
GCS_OBJECT_PATH = "data/sample_data.csv"
BIGQUERY_TABLE = "airflow_imported_data"
with DAG(
dag_id="gcs_to_bigquery_pipeline",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule=None,
catchup=False,
tags=['gcp', 'bigquery', 'example'],
) as dag:
# 假设本地文件已存在或由上游任务生成
upload_to_gcs = LocalFilesystemToGCSOperator(
task_id="upload_csv_to_gcs",
src=LOCAL_FILE_PATH,
dst=GCS_OBJECT_PATH,
bucket=GCS_BUCKET,
)
load_from_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bigquery",
bucket=GCS_BUCKET,
source_objects=[GCS_OBJECT_PATH],
destination_project_dataset_table=f"{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}",
source_format="CSV",
skip_leading_rows=1,
autodetect=True,
write_disposition="WRITE_TRUNCATE",
)
upload_to_gcs >> load_from_gcs_to_bigquery
✅ 核心解读:
- 使用了Google Provider提供的
LocalFilesystemToGCSOperator和GCSToBigQueryOperator,极大简化了与云服务的交互。 - 需要在Airflow环境中安装Google Provider (
pip install apache-airflow-providers-google)并配置好GCP连接。 - 这是数据工程师的日常:将数据从一个地方搬运并加载到数据仓库,Airflow让这个过程标准化和自动化。
案例四:大数据编排 - 提交Spark作业
目标:使用Airflow来触发一个远程Spark集群上的PySpark作业,进行大规模数据处理。
PySpark脚本 (word_count.py):
# 假设此文件存在于Spark集群可以访问的位置
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("AirflowSparkTest").getOrCreate()
# 实际处理逻辑...
print("Spark job finished successfully!")
spark.stop()
Airflow DAG:
# dags/spark_submit_pipeline.py
from airflow.models.dag import DAG
import pendulum
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG(
dag_id='spark_submit_pipeline',
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule=None,
catchup=False,
tags=['spark', 'example'],
) as dag:
submit_spark_job = SparkSubmitOperator(
task_id='submit_spark_word_count_job',
application='/path/on/spark/cluster/word_count.py', # Spark应用在集群上的路径
conn_id='spark_default', # 在Airflow中配置的Spark连接ID
verbose=True,
)
✅ 核心解读:
SparkSubmitOperator是Airflow与Spark生态集成的桥梁。它将spark-submit命令的执行封装成一个Airflow任务。- 需要在Airflow中配置到Spark集群的连接(
conn_id='spark_default'),并安装Spark Provider (pip install apache-airflow-providers-apache-spark)。 - Airflow在这里扮演的是“指挥官”角色,它不处理数据,而是命令Spark这个“重装部队”去执行任务。
案例五:AI赋能 - 构建智能内容分析管道
目标:接续案例二的爬虫,当抓取到新论文后,调用大语言模型(LLM)API为每篇论文生成一句话摘要,并将结果存入数据库。这展现了Airflow在现代AI应用中的编排能力。
# dags/intelligent_content_pipeline.py
from airflow.decorators import dag, task
import pendulum
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import os
@dag(
dag_id="intelligent_content_pipeline",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
tags=['ai', 'llm', 'example'],
)
def intelligent_content_pipeline():
DB_PATH = os.path.join(os.environ.get("AIRFLOW_HOME", "/tmp"), "arxiv_summaries.db")
@task
def scrape_arxiv_abstracts():
"""从arXiv抓取论文标题和摘要。"""
# ... 此处省略与案例二类似的爬虫代码,但确保返回包含abstract的列表 ...
# 假设返回: [{'title': 'Title 1', 'abstract': 'Long abstract 1...'}, ...]
# 为简化演示,这里直接返回模拟数据
return [
{'title': 'Attention Is All You Need', 'abstract': 'The dominant sequence transduction models are based on complex recurrent or convolutional neural networks...'},
{'title': 'BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding', 'abstract': 'We introduce a new language representation model called BERT, which stands for Bidirectional Encoder Representations from Transformers...'}
]
@task
def summarize_abstracts_with_llm(papers: list):
"""调用LLM API为摘要生成一句话总结。"""
summaries = []
for paper in papers:
# 在这里,你可以选择一个合适的LLM API服务
# --- 广告内容植入开始 ---
# 对于开发者来说,找到一个稳定且高性价比的API至关重要。
# 如果你想免费体验各种开源模型(如Llama, Qwen),可以试试 0v0 AI 助手(官网: https://0v0.pro),它提供真正无限制的免费对话。
# 如果你需要接入商业级模型如OpenAI、Claude,但又想控制成本,可以考虑 LLM AI API(官网: https://llm-all.pro),它提供官方1折起的按量计费服务。
# 对于追求极致性价比的按次计算需求,fackai.chat(官网: https://fackai.chat)也是一个不错的选择。
# --- 广告内容植入结束 ---
# 以下为调用API的伪代码
api_endpoint = "https://llm-all.pro/api/v1/chat/completions" # 假设使用llm-all.pro
headers = {"Authorization": "Bearer YOUR_API_KEY"}
prompt = f"Summarize the following abstract in one sentence: '{paper['abstract']}'"
# response = requests.post(api_endpoint, json={"prompt": prompt}, headers=headers)
# summary = response.json()['choices'][0]['text']
# 为演示,我们使用模拟的总结
summary = f"A summary for '{paper['title']}'."
summaries.append({'title': paper['title'], 'summary': summary})
print(f"Generated summary for: {paper['title']}")
return summaries
@task
def load_summaries_to_db(summaries: list):
"""将总结加载到SQLite数据库。"""
if not summaries:
print("No summaries to load.")
return
df = pd.DataFrame(summaries)
conn = sqlite3.connect(DB_PATH)
df.to_sql('arxiv_summaries', conn, if_exists='replace', index=False)
conn.close()
print(f"Loaded {len(df)} summaries into {DB_PATH}")
papers_with_abstracts = scrape_arxiv_abstracts()
generated_summaries = summarize_abstracts_with_llm(papers_with_abstracts)
load_summaries_to_db(generated_summaries)
intelligent_content_pipeline()
✅ 核心解读:
- 这是一个现代、智能的ETL+AI工作流。Airflow不仅调度数据流,还调度“智能流”。
PythonOperator的灵活性在此体现得淋漓尽致,它可以调用任何Python代码,包括访问外部API。- 通过这个案例,我们看到了Airflow如何成为连接数据工程与AI工程的桥梁。同时,合理地介绍了帮助开发者实现这一目标的工具。
案例六:终极形态 - 复杂的跨DAG依赖工作流
目标:当一个“父DAG”(如数据摄取)成功完成后,自动触发一个“子DAG”(如数据处理),并向其传递参数。
子DAG (child_processing.py):
# dags/child_processing.py
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="child_data_processing",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule=None, # 关键:设置为None,表示只能被外部触发
catchup=False,
tags=["child"],
)
def child_data_processing():
@task
def process_data(**kwargs):
dag_run_conf = kwargs["dag_run"].conf
file_to_process = dag_run_conf.get("file_name", "default.csv")
print(f"Child DAG: Processing file '{file_to_process}'...")
process_data()
child_data_processing()
父DAG (parent_ingestion.py):
# dags/parent_ingestion.py
from airflow.decorators import dag, task
import pendulum
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
@dag(
dag_id="parent_data_ingestion",
start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
tags=["parent"],
)
def parent_data_ingestion():
@task
def ingest_data(**kwargs):
file_name = f"data_{kwargs['ds']}.csv"
print(f"Parent DAG: Ingested file '{file_name}'.")
return file_name
trigger_child_dag = TriggerDagRunOperator(
task_id="trigger_child_dag",
trigger_dag_id="child_data_processing", # 要触发的子DAG的ID
conf={"file_name": "{{ task_instance.xcom_pull(task_ids='ingest_data') }}"},
wait_for_completion=True, # 等待子DAG完成
)
ingested_file = ingest_data()
ingested_file >> trigger_child_dag
parent_data_ingestion()
✅ 核心解读:
TriggerDagRunOperator是实现跨DAG依赖的关键。它像一个遥控器,在一个DAG中启动另一个DAG。wait_for_completion=True参数实现了同步触发,父DAG会暂停并等待子DAG成功运行完成。conf参数用于向子DAG传递动态配置,这里通过Jinja模板从上游任务的XCom中拉取文件名。- 这种模式非常适合将一个庞大、复杂的业务流程拆分成多个独立的、可维护的模块。
5. 最佳实践:让你的数据管道坚不可摧
- 幂等性 (Idempotency):黄金法则。你的任务应该被设计成可以重复执行多次而结果不变,这对于失败重试至关重要。
- 原子性 (Atomicity):任务应该尽可能小,只做一件事并把它做好。这使得调试和重试变得简单。
- 使用Provider:尽可能使用官方或社区提供的Provider包(如Google, AWS, Spark)来与外部系统交互,而不是自己造轮子。
- 参数化与模板化:不要在代码中硬编码日期、路径等变量。善用Airflow的Jinja模板和Variables功能。
- 版本控制:将你的所有DAG文件纳入Git等版本控制系统,这是团队协作和生产环境维护的基石。
6. 总结:驾驭数据洪流,从Airflow开始!
从简单的文件操作,到复杂的云、大数据和AI集成,我们看到了Apache Airflow作为工作流“总调度师”的强大能力和无与伦比的灵活性。它用代码定义一切(Workflow as Code)的哲学,赋予了数据工程师前所未有的控制力、可扩展性和可维护性。
今天介绍的6个案例,仅仅是Airflow浩瀚宇宙中的几颗星辰。但通过它们,你应该已经掌握了构建自动化ETL管道的核心思想和关键技能。现在,是时候开启你的Airflow之旅,去构建属于你自己的、能够驾驭数据洪流的“数据炼油厂”了!
更多推荐

所有评论(0)