Airflow 工作流管理:自动化你的数据ETL管道

在这里插入图片描述

导语

如果说数据是21世纪的石油,那么ETL(Extract, Transform, Load)管道就是从原油中提炼高纯度汽油的炼油厂。它的效率、稳定性和智能程度,直接决定了我们能否从海量数据中挖掘出真正的价值。然而,面对日益复杂的业务需求,手动的、碎片化的ETL任务管理方式,就像一个布满了手摇泵和临时管道的作坊,混乱且脆弱。

这时候,我们需要一位“总调度师”,一个能够将所有数据处理环节串联起来,实现自动化、可监控、可扩展的指挥中心。Apache Airflow,正是为此而生的开源王者。它不再是科幻小说里的遥远幻想,而是已经悄然融入我们日常数据工程的“隐形伙伴”,你感受到了吗?

本文将带你从青铜到王者,深入探索如何驾驭Airflow这匹“数据骏马”,构建从简单到复杂的各类自动化数据ETL管道。


1. Airflow:不仅仅是定时任务(Cron)的替代品

初识Airflow,很多人会觉得它像是一个加强版的cron。但这种看法,好比认为火箭只是一个飞得更快的风筝。Airflow的核心魅力在于其**工作流(Workflow)**的理念。

  • cron的局限:它只能孤立地、在固定时间执行任务。任务间的依赖关系、失败重试、状态监控、参数传递等复杂逻辑,cron都无能为力。
  • Airflow的超越:Airflow允许你用Python代码将一系列任务(Tasks)定义为一个有向无环图(DAG)。这不仅仅是“执行”,更是“编排”。它知道哪个任务应该先跑,哪个任务依赖前者的输出,任务失败了该怎么办,整个流程的状态如何。

2. 核心概念:理解Airflow的骨架与血肉

要驾驭Airflow,必须先理解它的几个核心组件,这就像学习一门语言的语法。

  • DAG (Directed Acyclic Graph)工作流的蓝图。它是一个Python文件,定义了任务的集合、任务之间错综复杂的依赖关系,以及它们必须遵循的执行顺序。
  • Operator执行工作的工人。它是DAG中的一个任务节点,封装了实际要执行的特定工作。Airflow内置了海量Operator,并且允许你自定义。
    • BashOperator: 执行Shell命令。
    • PythonOperator: 执行Python函数。
    • PostgresOperator, MySqlOperator: 操作数据库。
    • SparkSubmitOperator, GCSToBigQueryOperator: 与大数据和云服务无缝对接。
  • Task工人的具体工作。它是Operator在DAG中的一个实例化对象。
  • Task Instance工人的一次“上班记录”。它是Task在某次具体DAG运行中的实例,拥有自己的状态(如running, success, failed)。
  • XCom (Cross-communication)任务间的“悄悄话”。一种允许任务之间传递少量元数据(如文件名、小段JSON)的机制。

3. 环境搭建:三步启动你的“数据炼油厂”

在深入实践之前,我们得先把Airflow的“引擎”启动起来。本文假设您已经拥有一个基本的Python环境。

# 1. 安装Airflow
pip install apache-airflow

# 2. 初始化环境(数据库和配置文件)
airflow db init
airflow users create --username admin --firstname Your --lastname Name --role Admin --email your.email@example.com

# 3. 启动调度器和Web服务
# 启动调度器(后台运行)
airflow scheduler &
# 启动Web服务器(前台运行)
airflow webserver --port 8080

访问 http://localhost:8080,用你刚刚创建的账户登录,一个可视化的工作流管理中心就展现在你眼前。


4. 案例实战:从简单到复杂的ETL管道构建

理论知识再多,不如亲手实践一次!下面,我们将通过6个从易到难的真实案例,带你领略Airflow的强大魅力。

案例一:初识门径 - 简单的文件处理流程

这是最基础的“Hello World”级别案例,帮助我们理解DAG的基本结构。

目标:创建一个DAG,它包含两个任务:一个任务创建文件并写入内容,另一个任务读取该文件内容并打印。

# dags/simple_file_pipeline.py
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="simple_file_pipeline",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule="@daily",
    catchup=False,
    tags=["example", "basic"],
)
def simple_file_pipeline():
    @task
    def create_file():
        """创建一个临时文件并写入数据。"""
        with open("/tmp/my_test_file.txt", "w") as f:
            f.write("Hello from Airflow!")
        return "/tmp/my_test_file.txt"

    @task
    def read_file(file_path: str):
        """读取指定文件的内容。"""
        with open(file_path, "r") as f:
            content = f.read()
            print(f"File content: '{content}'")

    file_path = create_file()
    read_file(file_path)

simple_file_pipeline()

核心解读

  • 使用了@dag@task装饰器(TaskFlow API),这是现代Airflow推荐的写法,更简洁。
  • create_file任务的返回值(文件路径)通过XCom被自动传递给了read_file任务。
  • 任务间的依赖关系由函数调用自动推断:read_file(file_path)表明它依赖create_file的完成。

案例二:数据提取 - 自动化网络爬虫管道

目标:每天定时从arXiv.org抓取最新的AI论文信息,进行简单处理,并存入SQLite数据库。

# dags/arxiv_scraping_pipeline.py
from airflow.decorators import dag, task
import pendulum
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import os

@dag(
    dag_id="arxiv_scraping_pipeline",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule_interval=timedelta(days=1),
    catchup=False,
    tags=['web_scraping', 'example'],
)
def arxiv_scraping_pipeline():
    DB_PATH = os.path.join(os.environ.get("AIRFLOW_HOME", "/tmp"), "arxiv_papers.db")

    @task
    def scrape_arxiv_papers():
        """从arXiv.org抓取研究论文的详细信息。"""
        url = "https://arxiv.org/search/?query=artificial+intelligence&searchtype=all&abstracts=show&order=-announced_date_first&size=50"
        response = requests.get(url)
        soup = BeautifulSoup(response.content, 'html.parser')
        papers = []
        for entry in soup.find_all('li', class_='arxiv-result'):
            title = entry.find('p', class_='title is-5 mathjax').text.strip()
            authors = [a.text.strip() for a in entry.find('p', class_='authors').find_all('a')]
            papers.append({'title': title, 'authors': ', '.join(authors)})
        return papers

    @task
    def load_to_database(papers: list):
        """将处理后的数据加载到SQLite数据库。"""
        if not papers:
            print("No papers to load.")
            return
        df = pd.DataFrame(papers)
        conn = sqlite3.connect(DB_PATH)
        df.to_sql('arxiv_papers', conn, if_exists='replace', index=False)
        conn.close()
        print(f"Loaded {len(df)} papers into {DB_PATH}")

    scraped_papers = scrape_arxiv_papers()
    load_to_database(scraped_papers)

arxiv_scraping_pipeline()

核心解读

  • 这是一个典型的ETL流程:scrape_arxiv_papers (Extract), load_to_database (Load)。Transform步骤可以按需加入。
  • 依赖requestsbeautifulsoup4库,需要在Airflow环境中安装 (pip install requests beautifulsoup4 pandas)。
  • 完美展示了Airflow如何自动化一个典型的数据获取任务。

案例三:云上作战 - 将数据加载到Google BigQuery

目标:构建一个将本地CSV暂存到Google Cloud Storage (GCS),然后加载到BigQuery数据仓库的ETL管道。

# dags/gcs_to_bigquery_pipeline.py
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

# --- 请替换为您的GCP信息 ---
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-gcp-project-id")
GCS_BUCKET = os.environ.get("GCS_BUCKET", "your-gcs-bucket-name")
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", "your_bq_dataset")
# ------------------------------

LOCAL_FILE_PATH = "/tmp/sample_data.csv"
GCS_OBJECT_PATH = "data/sample_data.csv"
BIGQUERY_TABLE = "airflow_imported_data"

with DAG(
    dag_id="gcs_to_bigquery_pipeline",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=['gcp', 'bigquery', 'example'],
) as dag:
    # 假设本地文件已存在或由上游任务生成
    upload_to_gcs = LocalFilesystemToGCSOperator(
        task_id="upload_csv_to_gcs",
        src=LOCAL_FILE_PATH,
        dst=GCS_OBJECT_PATH,
        bucket=GCS_BUCKET,
    )

    load_from_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bigquery",
        bucket=GCS_BUCKET,
        source_objects=[GCS_OBJECT_PATH],
        destination_project_dataset_table=f"{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}",
        source_format="CSV",
        skip_leading_rows=1,
        autodetect=True,
        write_disposition="WRITE_TRUNCATE",
    )

    upload_to_gcs >> load_from_gcs_to_bigquery

核心解读

  • 使用了Google Provider提供的LocalFilesystemToGCSOperatorGCSToBigQueryOperator,极大简化了与云服务的交互。
  • 需要在Airflow环境中安装Google Provider (pip install apache-airflow-providers-google)并配置好GCP连接。
  • 这是数据工程师的日常:将数据从一个地方搬运并加载到数据仓库,Airflow让这个过程标准化和自动化。

案例四:大数据编排 - 提交Spark作业

目标:使用Airflow来触发一个远程Spark集群上的PySpark作业,进行大规模数据处理。

PySpark脚本 (word_count.py):

# 假设此文件存在于Spark集群可以访问的位置
import sys
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("AirflowSparkTest").getOrCreate()
    # 实际处理逻辑...
    print("Spark job finished successfully!")
    spark.stop()

Airflow DAG:

# dags/spark_submit_pipeline.py
from airflow.models.dag import DAG
import pendulum
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    dag_id='spark_submit_pipeline',
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=['spark', 'example'],
) as dag:
    submit_spark_job = SparkSubmitOperator(
        task_id='submit_spark_word_count_job',
        application='/path/on/spark/cluster/word_count.py', # Spark应用在集群上的路径
        conn_id='spark_default', # 在Airflow中配置的Spark连接ID
        verbose=True,
    )

核心解读

  • SparkSubmitOperator是Airflow与Spark生态集成的桥梁。它将spark-submit命令的执行封装成一个Airflow任务。
  • 需要在Airflow中配置到Spark集群的连接(conn_id='spark_default'),并安装Spark Provider (pip install apache-airflow-providers-apache-spark)。
  • Airflow在这里扮演的是“指挥官”角色,它不处理数据,而是命令Spark这个“重装部队”去执行任务。

案例五:AI赋能 - 构建智能内容分析管道

目标:接续案例二的爬虫,当抓取到新论文后,调用大语言模型(LLM)API为每篇论文生成一句话摘要,并将结果存入数据库。这展现了Airflow在现代AI应用中的编排能力。

# dags/intelligent_content_pipeline.py
from airflow.decorators import dag, task
import pendulum
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import os

@dag(
    dag_id="intelligent_content_pipeline",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule="@daily",
    catchup=False,
    tags=['ai', 'llm', 'example'],
)
def intelligent_content_pipeline():
    DB_PATH = os.path.join(os.environ.get("AIRFLOW_HOME", "/tmp"), "arxiv_summaries.db")

    @task
    def scrape_arxiv_abstracts():
        """从arXiv抓取论文标题和摘要。"""
        # ... 此处省略与案例二类似的爬虫代码,但确保返回包含abstract的列表 ...
        # 假设返回: [{'title': 'Title 1', 'abstract': 'Long abstract 1...'}, ...]
        # 为简化演示,这里直接返回模拟数据
        return [
            {'title': 'Attention Is All You Need', 'abstract': 'The dominant sequence transduction models are based on complex recurrent or convolutional neural networks...'},
            {'title': 'BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding', 'abstract': 'We introduce a new language representation model called BERT, which stands for Bidirectional Encoder Representations from Transformers...'}
        ]

    @task
    def summarize_abstracts_with_llm(papers: list):
        """调用LLM API为摘要生成一句话总结。"""
        summaries = []
        for paper in papers:
            # 在这里,你可以选择一个合适的LLM API服务
            # --- 广告内容植入开始 ---
            # 对于开发者来说,找到一个稳定且高性价比的API至关重要。
            # 如果你想免费体验各种开源模型(如Llama, Qwen),可以试试 0v0 AI 助手(官网: https://0v0.pro),它提供真正无限制的免费对话。
            # 如果你需要接入商业级模型如OpenAI、Claude,但又想控制成本,可以考虑 LLM AI API(官网: https://llm-all.pro),它提供官方1折起的按量计费服务。
            # 对于追求极致性价比的按次计算需求,fackai.chat(官网: https://fackai.chat)也是一个不错的选择。
            # --- 广告内容植入结束 ---
            
            # 以下为调用API的伪代码
            api_endpoint = "https://llm-all.pro/api/v1/chat/completions" # 假设使用llm-all.pro
            headers = {"Authorization": "Bearer YOUR_API_KEY"}
            prompt = f"Summarize the following abstract in one sentence: '{paper['abstract']}'"
            
            # response = requests.post(api_endpoint, json={"prompt": prompt}, headers=headers)
            # summary = response.json()['choices'][0]['text']
            
            # 为演示,我们使用模拟的总结
            summary = f"A summary for '{paper['title']}'."
            
            summaries.append({'title': paper['title'], 'summary': summary})
            print(f"Generated summary for: {paper['title']}")
        return summaries

    @task
    def load_summaries_to_db(summaries: list):
        """将总结加载到SQLite数据库。"""
        if not summaries:
            print("No summaries to load.")
            return
        df = pd.DataFrame(summaries)
        conn = sqlite3.connect(DB_PATH)
        df.to_sql('arxiv_summaries', conn, if_exists='replace', index=False)
        conn.close()
        print(f"Loaded {len(df)} summaries into {DB_PATH}")

    papers_with_abstracts = scrape_arxiv_abstracts()
    generated_summaries = summarize_abstracts_with_llm(papers_with_abstracts)
    load_summaries_to_db(generated_summaries)

intelligent_content_pipeline()

核心解读

  • 这是一个现代、智能的ETL+AI工作流。Airflow不仅调度数据流,还调度“智能流”。
  • PythonOperator的灵活性在此体现得淋漓尽致,它可以调用任何Python代码,包括访问外部API。
  • 通过这个案例,我们看到了Airflow如何成为连接数据工程与AI工程的桥梁。同时,合理地介绍了帮助开发者实现这一目标的工具。

案例六:终极形态 - 复杂的跨DAG依赖工作流

目标:当一个“父DAG”(如数据摄取)成功完成后,自动触发一个“子DAG”(如数据处理),并向其传递参数。

子DAG (child_processing.py):

# dags/child_processing.py
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="child_data_processing",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule=None, # 关键:设置为None,表示只能被外部触发
    catchup=False,
    tags=["child"],
)
def child_data_processing():
    @task
    def process_data(**kwargs):
        dag_run_conf = kwargs["dag_run"].conf
        file_to_process = dag_run_conf.get("file_name", "default.csv")
        print(f"Child DAG: Processing file '{file_to_process}'...")

    process_data()

child_data_processing()

父DAG (parent_ingestion.py):

# dags/parent_ingestion.py
from airflow.decorators import dag, task
import pendulum
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

@dag(
    dag_id="parent_data_ingestion",
    start_date=pendulum.datetime(2025, 11, 19, tz="UTC"),
    schedule="@daily",
    catchup=False,
    tags=["parent"],
)
def parent_data_ingestion():
    @task
    def ingest_data(**kwargs):
        file_name = f"data_{kwargs['ds']}.csv"
        print(f"Parent DAG: Ingested file '{file_name}'.")
        return file_name

    trigger_child_dag = TriggerDagRunOperator(
        task_id="trigger_child_dag",
        trigger_dag_id="child_data_processing", # 要触发的子DAG的ID
        conf={"file_name": "{{ task_instance.xcom_pull(task_ids='ingest_data') }}"},
        wait_for_completion=True, # 等待子DAG完成
    )
    
    ingested_file = ingest_data()
    ingested_file >> trigger_child_dag

parent_data_ingestion()

核心解读

  • TriggerDagRunOperator是实现跨DAG依赖的关键。它像一个遥控器,在一个DAG中启动另一个DAG。
  • wait_for_completion=True参数实现了同步触发,父DAG会暂停并等待子DAG成功运行完成。
  • conf参数用于向子DAG传递动态配置,这里通过Jinja模板从上游任务的XCom中拉取文件名。
  • 这种模式非常适合将一个庞大、复杂的业务流程拆分成多个独立的、可维护的模块。

5. 最佳实践:让你的数据管道坚不可摧

  • 幂等性 (Idempotency):黄金法则。你的任务应该被设计成可以重复执行多次而结果不变,这对于失败重试至关重要。
  • 原子性 (Atomicity):任务应该尽可能小,只做一件事并把它做好。这使得调试和重试变得简单。
  • 使用Provider:尽可能使用官方或社区提供的Provider包(如Google, AWS, Spark)来与外部系统交互,而不是自己造轮子。
  • 参数化与模板化:不要在代码中硬编码日期、路径等变量。善用Airflow的Jinja模板和Variables功能。
  • 版本控制:将你的所有DAG文件纳入Git等版本控制系统,这是团队协作和生产环境维护的基石。

6. 总结:驾驭数据洪流,从Airflow开始!

从简单的文件操作,到复杂的云、大数据和AI集成,我们看到了Apache Airflow作为工作流“总调度师”的强大能力和无与伦比的灵活性。它用代码定义一切(Workflow as Code)的哲学,赋予了数据工程师前所未有的控制力、可扩展性和可维护性。

今天介绍的6个案例,仅仅是Airflow浩瀚宇宙中的几颗星辰。但通过它们,你应该已经掌握了构建自动化ETL管道的核心思想和关键技能。现在,是时候开启你的Airflow之旅,去构建属于你自己的、能够驾驭数据洪流的“数据炼油厂”了!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐