当前位置: 首页 > news >正文

Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

导读(Introduction)

欢迎来到 Apache Airflow 源码深度解析系列的第六课。

在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DAG。这就是 DAG 解析与处理引擎所承担的职责。

想象一下:你在dags/目录下新建了一个 Python 文件,定义了一个 DAG。Airflow 是如何发现这个文件、导入其中的 Python 模块、提取 DAG 对象、序列化并写入数据库的?这个从"文件"到"数据库记录"的转换过程,正是本课的核心主题。

Airflow 3.x 对 DAG 解析体系做了重大重构,引入了DAG Bundle概念(源自 AIP-66),将 DAG 文件的组织与加载从"单一文件夹扫描"升级为"多源可插拔的 Bundle 管理"。同时,解析过程被设计为独立的子进程,实现了与 Scheduler 主进程的安全隔离——这意味着即使用户编写的 DAG 代码存在 bug 或安全风险,也不会影响调度器的稳定性。

本课将带你逐层拆解这个精密的解析引擎,从文件发现到子进程解析,从序列化到数据库持久化,完整理解 Airflow 如何将静态的 Python 文件转化为可调度的工作流实体。


学习目标(Learning Objectives)

完成本课学习后,你将能够:

  1. 理解 DAG 文件发现与导入机制——掌握 Airflow 如何扫描目录、过滤文件、检测 DAG 内容
  2. 掌握 DagBag 容器的工作原理——了解 DAG 的内存管理、验证和收集过程
  3. 理解 DAG Bundle 概念——掌握 AIP-66 引入的多源 DAG 文件组织与加载体系
  4. 分析 DagFileProcessorManager 的处理循环——理解并行解析的调度策略和生命周期管理
  5. 掌握子进程解析的通信协议——了解 Manager 与 Parser 子进程之间的 IPC 机制
  6. 理解解析结果的持久化路径——从序列化 DAG 到写入 DagModel、SerializedDagModel 的完整链路

正文内容(Main Content)

1. 整体架构:从文件到数据库的转换链路

DAG 解析引擎的核心使命是将用户编写的 Python DAG 文件转化为数据库中的结构化记录。这个过程可以概括为以下流水线:

DAG 文件 (.py) │ ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorManager (管理进程) │ │ - 文件发现 & Bundle 刷新 │ │ - 文件队列调度 │ │ - 子进程生命周期管理 │ └────────────────────┬────────────────────┘ │ fork 子进程 ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorProcess (子进程) │ │ - _parse_file_entrypoint() │ │ - DagBag.process_file() 加载 DAG │ │ - DagSerialization.to_dict() 序列化 │ │ - 通过 IPC 返回 DagFileParsingResult │ └────────────────────┬────────────────────┘ │ 结果回传 ▼ ┌─────────────────────────────────────────┐ │ persist_parsing_result() │ │ - update_dag_parsing_results_in_db() │ │ - 写入 DagModel / SerializedDagModel │ │ - 更新 ParseImportError / DagWarning │ └─────────────────────────────────────────┘

让我们逐层深入了解每个组件的实现细节。


2. DAG Bundle:多源文件组织体系

2.1 Bundle 的设计动机

在传统 Airflow 中,所有 DAG 文件必须放在一个统一的dags_folder中。这在大规模团队协作时会产生问题:

  • 不同团队的 DAG 混在一起,缺乏隔离
  • 无法为不同来源的 DAG 配置不同的刷新频率
  • Git 仓库管理困难,无法支持多仓库结构

Airflow 3.x 通过DAG Bundle(AIP-66)解决了这些问题。Bundle 是 DAG 文件的逻辑分组单位,每个 Bundle 可以有不同的来源(本地目录、Git 仓库等)、版本控制和刷新策略。

2.2 BaseDagBundle 抽象基类

所有 Bundle 实现都继承自BaseDagBundle

# 源码位置:airflow-core/src/airflow/dag_processing/bundles/base.pyclassBaseDagBundle(ABC):""" DAG bundles are used both by the DAG processor and by a worker when running a task. When running a task, we know what version of the bundle we need. The DAG processor uses a bundle to keep the DAGs up to date, always using the latest version. """supports_versioning:bool=Falsedef__init__(self,*,name:str,refresh_interval:int=conf.getint("dag_processor","refresh_interval"),version:str|None=None,view_url_template:str|None=None,)->None:self.name=name self.version=version self.refresh_interval=refresh_interval self.is_initialized:bool=Falseself.base_dir=get_bundle_base_folder(bundle_name=self.name)self.versions_dir=get_bundle_versions_base_folder(bundle_name=self.name)@property@abstractmethoddefpath(self)->Path:"""Airflow will use this path to find/load/execute the DAGs from the bundle."""@abstractmethoddefget_current_version(self)->str|None:"""Retrieve a string that represents the version of the DAG bundle."""@abstractmethoddefrefresh(self)->None:"""Retrieve the latest version of the files in the bundle."""definitialize(self)->None:"""Called before the bundle is used. Safe to call concurrently."""self.is_initialized=True

关键设计要点:

属性/方法说明
supports_versioning是否支持版本控制,影响 Worker 获取特定版本的能力
pathDAG 文件所在目录的路径
refresh_interval多久检查一次是否需要刷新
initialize()延迟初始化,避免昂贵操作过早执行
refresh()拉取最新版本,必须支持并发安全
get_current_version()获取当前版本标识符
2.3 LocalDagBundle:本地目录实现

最简单的 Bundle 实现是本地目录:

# 源码位置:airflow-core/src/airflow/dag_processing/bundles/local.pyclassLocalDagBundle(BaseDagBundle):"""Local DAG bundle - exposes a local directory as a DAG bundle."""supports_versioning=Falsedef__init__(self,*,path:str|None=None,**kwargs)->None:super().__init__(**kwargs)ifpathisNone:path=settings.DAGS_FOLDER self._path=Path(path)defget_current_version(self)->None:returnNonedefrefresh(self)->None:"""Nothing to refresh - it's just a local directory."""@propertydefpath(self)->Path:returnself._path

LocalDagBundle不支持版本控制(supports_versioning = False),刷新操作为空操作。它就是简单地指向一个本地文件夹。

2.4 DagBundlesManager:配置解析与实例化

DagBundlesManager负责读取配置并管理所有 Bundle 实例:

# 源码位置:airflow-core/src/airflow/dag_processing/bundles/manager.pyclassDagBundlesManager(LoggingMixin):"""Manager for DAG bundles."""def__init__(self):self._bundle_config:dict[str,_InternalBundleConfig]={}self.parse_config()defparse_config(self)->None:"""Get all DAG bundle configurations and store in instance variable."""config_list=conf.getjson("dag_processor","dag_bundle_config_list")# ... 解析 JSON 配置列表 ...forbundle_configinbundle_config_list:class_=import_string(bundle_config.classpath)self._bundle_config[bundle_config.name]=_InternalBundleConfig(bundle_class=class_,kwargs=bundle_config.kwargs,team_name=bundle_config.team_name,)defget_bundle(self,name:str,version:str|None=None)->BaseDagBundle:"""Get a DAG bundle by name."""cfg_bundle=self._bundle_config.get(name)returncfg_bundle.bundle_class(name=name,version=version,**cfg_bundle.kwargs)defsync_bundles_to_db(self,*,session:Session=NEW_SESSION)->None:"""Sync configured DAG bundles to the metadata database."""# 同步 DagBundleModel 记录

配置示例(airflow.cfg):

[dag_processor] dag_bundle_config_list = [ { "name": "my_team_dags", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/opt/airflow/dags/my_team"}, "team_name": "data_engineering" }, { "name": "ml_pipelines", "classpath": "my_custom.GitDagBundle", "kwargs": {"repo_url": "https://github.com/org/ml-dags.git"} } ]
2.5 Bundle 版本管理与 Worker 协作

Bundle 在 DAG Processor 和 Worker 中的使用模式不同:

┌─────────────────────────────────────────────────────────────┐ │ DAG Processor │ │ - 始终使用最新版本 │ │ - 定期 refresh() 拉取更新 │ │ - 同一时间只有一个版本在使用 │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ Worker │ │ - 使用任务创建时的特定版本 │ │ - 可能同时运行多个版本 │ │ - 通过 BundleVersionLock 锁定版本 │ └─────────────────────────────────────────────────────────────┘

BundleUsageTrackingManager负责清理不再使用的旧版本:

classBundleUsageTrackingManager:"""Utility helper for removing stale bundles."""defremove_stale_bundle_versions(self):"""Remove bundles not in use and not used for some time. Keeps last N used bundles, and bundles used within X time."""bundles=list(DagBundlesManager().get_all_dag_bundles())forbundleinbundles:ifnotbundle.supports_versioning:continueself._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)

3. DagBag:DAG 的内存容器

3.1 DagBag 的职责

DagBag是 Airflow 中经典的核心组件,它充当 DAG 对象的内存容器。其核心职责:

  1. 扫描目录:发现可能包含 DAG 的 Python 文件
  2. 导入模块:执行 Python 文件,提取 DAG 对象
  3. 验证 DAG:检测循环依赖、重复 ID、执行集群策略
  4. 收集错误:记录导入错误和警告
# 源码位置:airflow-core/src/airflow/dag_processing/dagbag.pyclassDagBag(LoggingMixin):""" A dagbag is a collection of dags, parsed out of a folder tree. Makes it easier to run distinct environments for production and development, tests, or different teams or security profiles. """def__init__(self,dag_folder:str|Path|None=None,include_examples:bool|ArgNotSet=NOTSET,safe_mode:bool|ArgNotSet=NOTSET,load_op_links:bool=True,collect_dags:bool=True,known_pools:set[str]|None=None,bundle_path:Path|None=None,bundle_name:str|None=None,):self.dag_folder=dag_folderorsettings.DAGS_FOLDER self.dags:dict[str,DAG]={}self.file_last_changed:dict[str,datetime]={}self.import_errors:dict[str,str]={}self.captured_warnings:dict[str,tuple[str,...]]={}self.known_pools=known_pools self.bundle_path=bundle_path
http://www.jsqmd.com/news/773312/

相关文章:

  • 2026年AI多语言能力测评:Gemini3.1Pro中英文差异揭秘
  • 拖拉机PST换挡规律与控制策略GABP神经网络【附代码】
  • 通过 Python 快速将现有应用接入 Taotoken 支持的多模型服务
  • 3个理由告诉你为什么PE-bear是Windows逆向分析的最佳入门工具
  • Netty 系列文章总览:从源码主线到业务架构判断
  • 从单点AI应用到联盟级智能体集群:AISMM模型驱动的7个真实联盟跃迁案例(含金融、能源、医疗闭源数据)
  • 通过审计日志功能追踪团队 API Key 的使用情况
  • Apache Airflow 系列教程 | 第7课:执行器(Executor)体系架构
  • 视频分析终极指南:如何用AI自动理解视频内容
  • 普世素数生成公式:数论重构与战略行动框架【乖乖数学】
  • 在数据清洗场景中利用 Taotoken 多模型能力优化处理流程
  • AITrack:用普通摄像头实现专业级6自由度头部追踪的AI解决方案
  • 第12篇 综合实战——制作一个学生管理系统 仓颉原生中文编程
  • Apache Airflow 系列教程 | 番外篇:通过 REST API 动态创建 DAG
  • 【四级】2025年12月英语四级真题试卷及答案解析电子版PDF(第一、二、三套全)
  • 对比直接使用官方API体验Taotoken在模型切换与成本控制上的便利
  • Obsidian的博客园同步插件配置
  • 特斯拉Model 3/Y CAN总线DBC文件终极指南:从零到精通的完整实战教程
  • iW610-01C‌ 是瑞萨电子(Renesas Electronics)推出的‌智能同步整流控制器‌,专为高效率 AC/DC 电源转换设计,广泛应用于快充适配器、高功率密度电源等场景。
  • 2024长春相机回收服务商深度**:专业、便捷、高价是核心标准 - 2026年企业推荐榜
  • AssetStudio音频提取实战指南:从Unity资源到MP3/WAV的完整解决方案
  • 五级地址解析是什么?为什么比四级多了行政村
  • 2026年度多路数据采集仪厂家怎么选?老品牌JINKO金科6大主流代表型号详解!附10条DAQ专业FAQ问答! - 奋斗者888
  • 如何快速掌握OR-Tools:5个高效优化算法的终极指南
  • Go语言的并发安全
  • 2026年最新松原路灯采购指南:从厂家实力到场景适配的深度解析 - 2026年企业推荐榜
  • 移动物联赋能的多智能农机联合优化协同作业旅行商问题【附代码】
  • Go语言的容器化和部署
  • VirtualRouter:将Windows电脑变身为智能无线共享中心的十年经典
  • 开源量化期权交易框架FlowAlgo:从事件驱动到希腊字母风控