深入浅出:近距离观察 4 个 Airflow 内部机制
无论您是 Airflow 的新手还是经验丰富的工程师,总有学习新技术的机会,这些新技术可能甚至没有在文档中(明确地)提到。💡如果您觉得它有用,请确保关注我并订阅我的 Medium 订阅新闻,以便在下一篇文章发布时得到通知。
通过 DALL-E 生成的图像
我已经与 Airflow 合作超过三年了,总的来说,我对它相当自信。它是一个强大的编排器,帮助我快速且可扩展地构建数据管道,而对于我想要实现的大多数事情,它都自带电池。
最近,在准备获得 Airflow 认证的过程中,我遇到了许多我实际上毫无头绪的事情。这本质上是我写这篇文章并分享一些让我完全震惊的 Airflow 内部知识的动机!
1. 调度器只解析包含特定关键词的文件
Airflow 调度器只会解析包含 airflow 或 dag 的文件! 是的,您听得很对!如果 DAG 文件夹下的文件不包含这两个关键词中的至少一个,它将不会被调度器解析。
如果您想修改这个规则,使其不再成为调度器的必要条件,您可以简单地设置 DAG_DISCOVERY_SAFE_MODE 配置设置为 False。在这种情况下,调度器将解析您 DAG 文件夹(/dags)下的所有文件。
我不推荐禁用这个检查,因为这样做实际上并没有什么意义。一个合适的 DAG 文件将包含 Airflow 导入和 DAG 定义,这意味着解析该文件的要求已经得到满足,但了解这条规则是值得的。
2. 命名中包含特定关键词的变量其值会被隐藏
我们知道默认情况下,Airflow 会隐藏存储在连接(特别是 password 字段)中的敏感信息,但变量呢?
嗯,这确实可能,而且令人震惊的是,Airflow 可以自动为您完成这项工作。如果一个变量包含某些可能表示敏感信息的关键词,那么它的值将自动被隐藏。
这里有一份关键词列表,这些关键词将使变量符合存储敏感信息值的条件:
access_token
api_key
apikey
authorization
passphrase
passwd
password
private_key
secret
token
keyfile_dict
service_account
这意味着如果您的变量名包含这些关键词中的任何一个,Airflow 将相应地处理其值。
让我们看看这个功能在实际中的应用。首先,让我们从 UI 中创建一个变量,其名称不包含任何使其成为敏感信息的关键词。如图下所示,变量 my_var 的值在用户界面上是可见的。
不作为敏感信息的 Airflow 变量 – 来源:作者
现在,让我们创建另一个变量,其名称中包含一个关键词,并看看会发生什么。从 UI 中,我们进入管理 -> 变量 -> + 创建一个名为 my_api_key 的新变量:
通过用户界面创建新的 Airflow 变量 – 来源:作者
现在,如果我们转到用户界面的变量部分,我们可以看到我们新创建的变量,但你现在应该注意到其实际值已被隐藏,并替换为星号。
包含特定关键词的 Airflow 变量将隐藏其值以供 UI 查看 – 来源:作者
个人而言,我更喜欢将 Airflow 变量存储在秘密存储库中,例如 HashiCorp Vault,因此我并不知道 Airflow 是以这种方式处理变量的。实际上,这是一个非常棒且实用的功能。然而,我期望它能够更加灵活。与其有一组预定义的关键词,不如如果我们能够指定变量是否包含敏感信息,而不是限制自己在变量命名时使用特定的关键词,那就更方便了。让我们希望这将在未来的 Airflow 版本中得到实现。
目前,你仍然可以通过配置 Airflow 来扩展敏感关键词列表,无论是使用 airflow.cfg 中的 sensitive_var_conn_names(在 [core] 部分),还是通过导出 AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES 环境变量。
sensitive_var_conn_names
- 新增于版本 2.1.0。
在变量名称或连接的额外 JSON 中查找的逗号分隔的额外敏感关键词列表。
类型:
string默认:''环境变量:AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES`
3. 两个或更多 DAG 实际上可以具有相同的 DAG ID(但 UI 不喜欢)
我总是确保为我的所有 DAG 使用唯一的 ID。实际上,我是通过根据它们的 Python 文件名来命名它们来做到这一点的(因为这种方法可以保证不会意外地创建重复的文件名——以及 DAG ID)。直到现在,我一直有一种印象,即不可能有两个或更多具有相同 ID 的 DAG,但令人惊讶的是,这远非事实!
两个或更多 DAG 实际上可以具有相同的 DAG ID,这意味着你将看不到任何错误。然而,这实际上是一种不良做法,必须避免。即使调度器不会对具有相同 ID 的 DAG 提出异议,在用户界面中,一个 DAG 将显示出来。实际上,如果你过一段时间后刷新 UI,之前显示的可能会消失,而具有相同 ID 的 DAG 之一这次会显示出来。
虽然这不仅仅是关于在用户界面中渲染 DAG,但当您想要引用一个 ID 被其他 DAG(s) 使用的 DAG 时,事情可能会变得极其糟糕。例如,考虑一下 TriggerDagRunOperator,它可以在 DAG 中用来触发另一个 DAG。在这种情况下,错误的 DAG 可能会被触发。用户甚至报告说,在 DAG 的 UI 上点击“运行”,导致了显示在 UI 上的 DAG 同 ID 的执行。所以请确保避免有重复的 DAG ID。
💡 小贴士:您可以使用 Path(__filename__).stem 动态地为 DAG 命名,而不是硬编码 DAG 的 ID:
from pathlib import Path
from airflow import DAG
with DAG(
dag_id=Path(__file__).stem,
...
):
...
4. Airflow 支持 “ignore file”
我很确定你对 .gitignore 文件很熟悉,但个人来说,我并不知道 Airflow 也支持相同的结构。
您可以通过在 DAG 文件夹本身下创建一个 .airflowignore 文件来指示调度器忽略 DAG 文件夹中的某些文件(即 /dags/.airflowignore)。总体上,它就像一个 .gitignore 文件。您可以使用它来指定 DAG 文件夹中的目录或文件。.airflowignore 文件中的每一行实际上指定了一个正则表达式模式,这意味着任何名称与指定模式匹配的目录或文件都将被忽略。
例如,假设我们已经创建了一个包含以下内容的 .airflowignore 文件。
# .airflowignore
helper
dbt_[d]
类似的文件
-
helper_functions.py -
utils_HeLpEr.py -
dbt_1.py -
dbt_2.py -
helper/utils.py
将会被完全忽略。但请注意,如果一个目录的名称与任何模式匹配,那么这个目录及其所有子目录将不会被 Airflow 扫描。
如果您的 DAG 文件夹中有大量文件,而您的调度器不应该关心这些文件,这个功能可以帮助您加快 DAG 解析的速度。
最后的想法…
无论您是 Airflow 的新手还是经验丰富的工程师,总有学习新技术的机会,这些新技术可能甚至没有在文档中(明确地)提到。
💡 感谢您花时间阅读这篇文章。 如果您觉得它有用,请确保关注我并订阅我的 Medium 订阅新闻,以便在下一篇文章发布时得到通知。
更多推荐



所有评论(0)