在构建现代化数据平台时,我们有了“搬运工”(Glue)、“仓库”(S3)和“计算引擎”(Athena)。但随着业务复杂度增加(比如涉及 5 组业务的依赖关系),我们需要一个总指挥官来告诉这些人:什么时候开工?出错了怎么办?谁先谁后?

这个指挥官就是 Apache Airflow(在 AWS 上称为 MWAA: Managed Workflows for Apache Airflow)。


一、 什么是 Airflow?(基本理解)

Airflow 是一个**“工作流编排平台”**(Workflow Orchestration Platform)。

  • 核心理念Configuration as Code (代码即配置)。你不需要在网页上拖拽连线,而是通过编写 Python 代码 来定义整个业务流程。

  • 它的角色:它是**“大脑”**,不是“肌肉”。

    • 不处理数据(不搬运、不计算)。

    • 只负责调度(触发 Glue 搬运、触发 Athena 计算、触发 Java 接口回调)。

  • AWS MWAA:这是 AWS 托管的 Airflow 服务。你不需要自己维护服务器,只需要上传 Python 代码到 S3,AWS 会自动运行它。


二、 核心作用:为什么要用它?

如果 Glue 自带 Workflow,为什么还要 Airflow?

  1. 跨服务编排:Glue 的工作流只能管 Glue 自己的事。但 Airflow 可以编排 AWS 服务(Glue, Athena, Lambda)与 非 AWS 服务(你的 EKS Java 应用、第三方 API、Slack 通知)之间的混合流程。

  2. 复杂逻辑控制

    • “如果周五,跑全量;如果周一,跑增量。”

    • “如果 Glue 失败了,自动重试 3 次,每次间隔 5 分钟;如果还失败,发报警给运维。”

  3. 可视化监控:Airflow 提供了一个强大的 Web UI,你可以一眼看到哪个任务(Task)变红了(失败),并在界面上查看日志。


三、 它是怎么工作的?(使用原理)

Airflow 的核心单位是 DAG (Directed Acyclic Graph,有向无环图)。一个 DAG 就是一个 Python 脚本,代表一个完整的业务流程。

1. 编写流程

  1. 本地开发:在 IDE 中编写 Python 脚本。

  2. 定义任务 (Operators):使用预置的“插件”来定义动作。

    • GlueJobOperator: 触发 Glue。

    • AthenaOperator: 触发 Athena。

    • SimpleHttpOperator: 发送 HTTP 请求(调用你的 Java 应用)。

  3. 定义顺序:使用 >> 符号连接任务(如 task_A >> task_B)。

2. 部署流程

  1. 将写好的 .py 文件上传到 AWS 指定的 S3 桶dags/ 文件夹)。

  2. MWAA 后台自动扫描 S3,解析代码,并在 Web 界面生成流程图。

  3. 根据你设定的时间(如 schedule_interval='@daily')自动触发。


四、 相互调用与权限配置(连接机制)

Airflow 作为一个中心节点,需要访问周边所有服务。它的“连接”分为两类:

1. 调用 AWS 内部服务 (Glue / Athena)

机制IAM Role (身份认证)

  • 原理:MWAA 实例本身绑定了一个 IAM Role(执行角色)。

  • 配置:你不需要在代码里写账号密码。只要给这个 IAM Role 赋予 glue:StartJobRunathena:StartQueryExecution 权限,Airflow 就能直接指挥它们。

2. 调用你的 EKS 微服务 (Java / JDK)

机制Connection + Token (暗号认证)

  • 原理:通过 HTTP 请求调用 Java 暴露的 REST API。

  • 配置

    1. Airflow 端:在 UI 的 Connections 菜单里,配置 Host(Java 服务的内网地址)和 Header(存入 Token,如 X-Api-Key: secret-123)。当然对于database,S3,glue的连接也可以写在这里面

    2. Java 端:编写 Interceptor(拦截器),校验请求头里的 Token 是否匹配。

    3. 网络层:确保 VPC 安全组允许 Airflow 的 IP 访问 EKS 的端口。


五、 实战案例:电商年度账单生成系统

假设你有 5 组业务(订单、支付等),我们需要每天凌晨生成报表并通知 Java 后端。

1. 业务流程图

  1. 等待:检查 S3 上昨天的日志文件是否到位(Sensor)。

  2. 清洗:启动 Glue Job 把 JSON 转为 Parquet。

  3. 计算:启动 Athena 统计各分类消费总额。

  4. 通知:调用 Java 接口,告诉 App 端数据准备好了。

2. 代码实现 (Python DAG)

Python

from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime

# 定义 DAG:每天凌晨 2 点执行
with DAG(
    dag_id='daily_ecommerce_report',
    schedule_interval='0 2 * * *', 
    start_date=datetime(2026, 1, 1),
    catchup=False
) as dag:

    # 步骤 1: 感应器。一直等到 S3 出现标志文件才继续
    wait_for_data = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='my-raw-data-bucket',
        bucket_key='ready_flags/{{ ds }}.flag', # 检查当天的 flag 文件
        timeout=60 * 60, # 最多等1小时
        poke_interval=60 # 每60秒检查一次
    )

    # 步骤 2: 触发 Glue 进行清洗 (IAM Role 授权)
    clean_data = GlueJobOperator(
        task_id='trigger_glue_etl',
        job_name='ecommerce_clean_job_v1',
        script_args={'--day': '{{ ds }}'} # 把日期传给 Glue
    )

    # 步骤 3: 触发 Athena 进行聚合计算 (IAM Role 授权)
    calc_report = AthenaOperator(
        task_id='run_athena_sql',
        query='SELECT category, sum(amount) FROM orders WHERE day="{{ ds }}" GROUP BY category',
        database='ecommerce_db',
        output_location='s3://my-reports/{{ ds }}/'
    )

    # 步骤 4: 通知 Java 后端 (HTTP Connection + Token 授权)
    notify_backend = SimpleHttpOperator(
        task_id='notify_java_app',
        http_conn_id='my_eks_connection', # 对应 UI 里配好的连接
        endpoint='/api/v1/callbacks/report-ready',
        method='POST',
        data='{"status": "success", "date": "{{ ds }}"}',
        headers={"Content-Type": "application/json"}
    )

    # 定义依赖关系 (流程编排)
    wait_for_data >> clean_data >> calc_report >> notify_backend

六、 总结

对于后端开发者来说,Airflow 是连接 大数据世界(Glue/Athena)微服务世界(Spring Boot) 的桥梁。

  • Glue 负责卖力气(搬砖)。

  • Airflow 负责看时间、发号施令(工头)。

  • Java 负责处理复杂的业务逻辑和用户交互(前台)。

通过这套组合,你不仅实现了系统的解耦,还保证了数据处理流程的可观测性高可靠性

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐