Airflow任务组失败处理:让触发与监听共进退
1. 项目概述:为什么“任务组失败”是 Airflow 生产环境里最隐蔽的定时炸弹
我在金融数据平台带团队做调度系统升级时,连续三个月被同一个问题反复打脸:每天凌晨三点,一个关键的 ETL 流水线会准时失败,但重跑一次就成功。日志里找不到明显报错,监控看资源也完全正常。最后发现,问题出在 Azure Data Factory Pipeline 的两个任务上——RunOperator 触发了远程作业,SensorOperator 等待它完成并检查状态。RunOperator 成功返回了 pipeline ID,但 SensorOperator 在轮询过程中因为网络抖动超时失败。Airflow 默认只重试 Sensor,而 RunOperator 已标记为 success,不会重放。结果就是:下游所有依赖这个 pipeline 输出的清洗、建模任务全被卡死,整个数据链路中断四小时,直到人工介入重跑。这根本不是“单点故障”,而是典型的任务组语义断裂——Airflow 把两个逻辑上必须共进退的操作,当成了彼此独立的原子单元。
这就是本文要解决的核心问题:在 Airflow 中,如何让一组存在强业务耦合关系的任务(比如“触发 + 监听”、“提取 + 校验”、“发送 + 确认”)真正作为一个整体来失败、重试和恢复?关键词Airflow不只是工具名,它代表了一种工程约束:DAG 是有向无环图,但业务逻辑常常是闭环的、成对的、状态强关联的。官方文档里从不提“任务组”,社区讨论也多聚焦在 DAG 编排或 UI 优化,可真实生产环境里,90% 的非代码类故障都源于这种语义鸿沟。我见过太多团队用 hack 方式绕过——比如把 Sensor 逻辑硬塞进 RunOperator 里,或者写个自定义 Operator 把两步合成一步。这些方案短期能用,但代价是牺牲可观测性、破坏幂等性设计、增加调试成本。本文提供的不是“技巧”,而是基于 Airflow 内部 TaskInstance 状态机原理的正解。它不改源码、不依赖插件、不引入外部服务,纯 Python 实现,已在我负责的三个千级 DAG 规模集群稳定运行 18 个月。如果你正在用 Airflow 调度云服务 API、批处理作业、外部系统集成,或者任何需要“发起请求 + 等待响应”的场景,这篇文章能帮你省下至少 200 小时的故障排查时间。
2. 核心设计思路:为什么必须绕开 DAG 依赖,直击 TaskInstance 状态层
2.1 DAG 依赖的本质是“执行顺序”,不是“状态绑定”
很多人第一次遇到这个问题时,第一反应是检查>>或set_downstream的写法是否正确。比如这样写:
task_run = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline", pipeline_name="daily_ingestion", # ... 其他参数 ) task_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="wait_for_pipeline", pipeline_run_id="{{ ti.xcom_pull(task_ids='run_pipeline') }}", # ... 其他参数 ) task_run >> task_sensor看起来天衣无缝:task_run必须成功,task_sensor才会启动。但这里有个致命的认知偏差——DAG 边只控制执行流,不控制状态流。Airflow 的调度器在构建执行计划时,确实会根据 DAG 边决定task_sensor的前置条件是task_run.state == 'success'。可一旦task_sensor开始执行,它的生命周期就完全脱离了task_run的状态管理。如果task_sensor因为网络超时、认证失效、目标服务不可用等原因失败,Airflow 只会把它自己的TaskInstance状态设为failed,然后按retries=3的配置重试它自己。task_run的状态早已固化为success,调度器连看都不会再看它一眼。这不是 bug,是设计使然:Airflow 必须保证上游任务的成功结果可复用。想象一下,如果task_extract成功读取了 10TB 数据到临时表,task_transform失败后task_extract被强制重跑,那不仅是资源浪费,更可能引发数据重复写入或锁表风险。所以默认行为完全合理,但它恰好撞上了我们这类“发起即承诺”的集成场景。
2.2 真正的突破口在 TaskInstance 的上游追溯能力
既然 DAG 边走不通,就得换条路。我翻了 Airflow 2.2+ 的源码,发现TaskInstance对象有一个常被忽略的属性:task。而task对象本身又包含upstream_task_ids和downstream_task_ids这两个列表。更重要的是,TaskInstance的get_direct_relatives()方法能直接获取其上下游的TaskInstance实例。这意味着,当task_sensor进入on_failure_callback时,我们完全可以通过它反向查到task_run的TaskInstance,然后手动修改task_run的状态。这比操作 DAG 层面的元数据安全得多,因为TaskInstance是运行时实体,状态变更直接影响调度器的下一轮决策。我测试过,在on_failure_callback里调用session.merge()更新TaskInstance.state后,调度器会在下一个调度周期(默认 30 秒)自动识别到该实例状态变为up_for_retry,并像对待普通失败任务一样触发重试逻辑。整个过程不重启 Webserver,不刷新 DAG,零感知。
2.3 为什么不能简单用trigger_dag或clear_task_instances?
有同事提议用TriggerDagRunOperator重新触发整个 DAG。这看似粗暴有效,但实际埋了三个雷:第一,它会创建全新的DagRun,导致历史记录断裂,无法追踪原始失败根因;第二,如果 DAG 里有LatestOnlyOperator或TimeDeltaSensor这类依赖时间窗口的组件,新触发的 DAG 可能跳过关键校验;第三,也是最致命的,它会丢失原始DagRun的conf参数,比如动态传入的日期分区ds,导致下游任务处理错误的数据集。另一种方案是用 CLI 命令airflow tasks clear,但这是运维操作,无法嵌入到任务失败的自动化流程中,且需要额外权限管控。我们的目标是让失败处理逻辑成为任务定义的一部分,就像retries和retry_delay那样声明式、可版本化、可审计。所以最终方案必须满足:纯 Python、运行时生效、不依赖外部命令、状态变更可回滚、与 Airflow 原生重试机制无缝集成。
3. 实操细节解析:三步实现任务组状态同步
3.1 第一步:精准定位上游任务实例(支持深度遍历与 ID 白名单双模式)
核心函数get_upstream_task_instances必须解决两个现实问题:一是有些场景需要“就近原则”,比如只重试直接上游的 RunOperator;二是复杂 DAG 里存在分支,比如task_run同时触发了task_sensor_a和task_sensor_b,现在task_sensor_a失败,我们只想拉上task_run,不想误伤task_sensor_b的上游。因此函数设计为双模式:
def get_upstream_task_instances( ti: TaskInstance, upstream_task_ids: Optional[List[str]] = None, upstream_depth: Optional[int] = None, session: Optional[Session] = None ) -> List[TaskInstance]: """ 获取指定 TaskInstance 的上游 TaskInstance 列表 Args: ti: 当前失败/重试的 TaskInstance upstream_task_ids: 显式指定要包含的上游 task_id 列表(白名单模式) upstream_depth: 从当前 ti 向上追溯的最大层级(深度优先模式) session: SQLAlchemy session,若未提供则自动创建 Returns: 符合条件的 TaskInstance 列表 """ if not session: session = settings.Session() # 模式一:白名单优先,显式指定 task_id if upstream_task_ids: return session.query(TaskInstance).filter( TaskInstance.dag_id == ti.dag_id, TaskInstance.execution_date == ti.execution_date, TaskInstance.task_id.in_(upstream_task_ids), TaskInstance.state.in_(['success', 'up_for_retry', 'queued', 'running']) ).all() # 模式二:深度遍历,适用于线性依赖链 if upstream_depth is not None and upstream_depth > 0: # 使用递归 CTE 查询(PostgreSQL/MySQL 8.0+)或循环查询(兼容旧版) # 此处为简化版循环实现,生产环境建议用原生 SQL CTE 提升性能 upstream_tis = [] current_level_tis = [ti] for depth in range(1, upstream_depth + 1): next_level_tis = [] for current_ti in current_level_tis: # 获取 current_ti 的直接上游 task_ids upstream_ids = current_ti.task.upstream_task_ids if not upstream_ids: continue # 查询这些 task_id 在同一 dag_run 下的 TaskInstance level_tis = session.query(TaskInstance).filter( TaskInstance.dag_id == ti.dag_id, TaskInstance.execution_date == ti.execution_date, TaskInstance.task_id.in_(upstream_ids), TaskInstance.state.in_(['success', 'up_for_retry', 'queued', 'running']) ).all() next_level_tis.extend(level_tis) if not next_level_tis: break upstream_tis.extend(next_level_tis) current_level_tis = next_level_tis return list(set(upstream_tis)) # 去重 # 默认:只返回直接上游 direct_upstream_ids = ti.task.upstream_task_ids if not direct_upstream_ids: return [] return session.query(TaskInstance).filter( TaskInstance.dag_id == ti.dag_id, TaskInstance.execution_date == ti.execution_date, TaskInstance.task_id.in_(direct_upstream_ids), TaskInstance.state.in_(['success', 'up_for_retry', 'queued', 'running']) ).all()提示:白名单模式(
upstream_task_ids)适合绝大多数场景,因为它明确、可控、性能好。深度模式(upstream_depth)仅在 DAG 结构高度动态时使用,比如用BranchPythonOperator生成不同分支,且你希望某个分支失败时,把该分支路径上的所有上游都拉下来重试。但要注意,深度模式在复杂 DAG 中可能查到无关任务,务必配合state.in_()过滤,避免把已失败或已跳过的任务也拉进来。
3.2 第二步:安全地批量更新上游任务状态
状态更新是高危操作,必须遵循 Airflow 的状态机规则。不能直接把success改成failed,因为failed是终态,无法再重试。正确的做法是设为up_for_retry,让调度器接管后续流程。同时要处理并发冲突——如果多个 Sensor 同时失败,它们可能同时尝试修改同一个 RunOperator 的状态,必须加数据库行锁:
def mark_upstream_for_retry( ti: TaskInstance, upstream_task_ids: Optional[List[str]] = None, upstream_depth: Optional[int] = None, session: Optional[Session] = None ) -> int: """ 将上游 TaskInstance 标记为 up_for_retry Returns: 成功更新的数量 """ if not session: session = settings.Session() try: upstream_tis = get_upstream_task_instances( ti, upstream_task_ids, upstream_depth, session ) updated_count = 0 for upstream_ti in upstream_tis: # 关键:只更新处于 success 状态的上游任务 # 避免覆盖 already_failed 或 up_for_retry 的状态 if upstream_ti.state != 'success': continue # 使用 SELECT FOR UPDATE 加行锁,防止并发修改 locked_ti = session.query(TaskInstance).filter( TaskInstance.dag_id == upstream_ti.dag_id, TaskInstance.execution_date == upstream_ti.execution_date, TaskInstance.task_id == upstream_ti.task_id ).with_for_update().first() if locked_ti and locked_ti.state == 'success': # 更新状态和重试计数 locked_ti.state = 'up_for_retry' locked_ti.try_number += 1 # 增加重试次数 locked_ti.start_date = timezone.utcnow() # 重置开始时间 locked_ti.end_date = None session.merge(locked_ti) updated_count += 1 session.commit() return updated_count except Exception as e: session.rollback() logging.error(f"Failed to mark upstream for retry: {e}") raise finally: if not session: session.close() def mark_upstream_as_failed( ti: TaskInstance, upstream_task_ids: Optional[List[str]] = None, upstream_depth: Optional[int] = None, session: Optional[Session] = None ) -> int: """将上游 TaskInstance 标记为 failed(慎用,通常用于不可重试场景)""" # 实现逻辑类似 mark_upstream_for_retry,但 state 设为 'failed' # 注意:failed 是终态,设置后无法再重试,仅用于业务上绝对不可逆的操作 pass注意:
try_number += 1这一行至关重要。Airflow 的重试逻辑依赖try_number和max_tries的比较。如果只改state不改try_number,调度器会认为这是第 0 次重试,可能立即再次失败。另外,start_date重置是为了让重试任务的耗时统计准确,否则会把首次执行时间也算进去。
3.3 第三步:将回调函数注入到任务定义中
这才是让方案落地的关键。回调函数必须作为任务参数传入,而不是全局注册,这样才能保证每个任务组的策略独立。以 Azure Data Factory 为例:
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from datetime import datetime, timedelta def sensor_failure_callback(context): """Sensor 失败时的回调:重试其上游的 RunOperator""" ti = context['task_instance'] # 显式指定要重试的上游 task_id mark_upstream_for_retry( ti=ti, upstream_task_ids=['run_pipeline'] # 精准锁定 ) def sensor_retry_callback(context): """Sensor 重试时的回调:同样重试上游,避免状态不一致""" ti = context['task_instance'] mark_upstream_for_retry( ti=ti, upstream_task_ids=['run_pipeline'] ) default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': True, 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'adf_pipeline_orchestration', default_args=default_args, description='Orchestrate ADF pipeline with grouped failure handling', schedule_interval='0 3 * * *', # 每天凌晨3点 catchup=False, tags=['azure', 'data-factory', 'production'], ) # RunOperator:触发 ADF pipeline task_run = AzureDataFactoryRunPipelineOperator( task_id='run_pipeline', pipeline_name='daily_ingestion', resource_group_name='rg-data-pipeline', factory_name='adf-prod', # ... 其他必要参数 dag=dag, ) # SensorOperator:等待 pipeline 完成 task_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id='wait_for_pipeline', pipeline_run_id="{{ ti.xcom_pull(task_ids='run_pipeline') }}", resource_group_name='rg-data-pipeline', factory_name='adf-prod', # 关键:注入回调 on_failure_callback=sensor_failure_callback, on_retry_callback=sensor_retry_callback, # 注意:sensor 自身也要配置重试,否则不会触发 on_retry_callback retries=3, retry_delay=timedelta(minutes=2), dag=dag, ) task_run >> task_sensor这里有个易错点:on_retry_callback只有在任务进入up_for_retry状态时才会触发,而up_for_retry的前提是任务先失败(failed),然后调度器根据retries配置将其状态改为up_for_retry。所以task_sensor必须配置retries > 0,否则on_retry_callback永远不会执行。我见过太多人漏掉这行,导致回调函数形同虚设。
4. 完整实操流程与生产级配置验证
4.1 搭建最小可验证环境(MVE)
别急着上生产,先用本地 Airflow 测试闭环。我推荐用 Docker Compose 启一个 Airflow 2.6+ 单节点:
# docker-compose.yml version: '3' services: webserver: image: apache/airflow:2.6.3 environment: - AIRFLOW__CORE__EXECUTOR=SequentialExecutor - AIRFLOW__CORE__SQL_ALCHEMY_CONN=sqlite:///airflow.db - AIRFLOW__CORE__FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins ports: - "8080:8080"把上面的 DAG 文件保存为dags/adf_orchestration.py,然后启动:
docker-compose up -d # 等待 Webserver 启动后,访问 http://localhost:8080,启用 DAG4.2 模拟故障并验证状态同步
Airflow UI 是最好的验证工具。按以下步骤操作:
- 手动触发 DAG:在 UI 中点击
Trigger DAG,选择adf_pipeline_orchestration。 - 强制 Sensor 失败:在
wait_for_pipeline任务详情页,点击Clear清除该任务实例。然后编辑其日志,找到AzureDataFactoryPipelineRunStatusSensor的poke方法,在返回False前插入raise AirflowException("Simulated network timeout")。这会让 Sensor 在每次 poke 时都失败。 - 观察状态流转:
- 第一次执行:
run_pipeline→success,wait_for_pipeline→failed(第一次失败) - 第二次调度(约30秒后):
wait_for_pipeline→up_for_retry(触发on_retry_callback),同时run_pipeline→up_for_retry(我们的回调生效) - 第三次调度:
run_pipeline→success(重放),wait_for_pipeline→success(这次 poke 成功)
- 第一次执行:
实测心得:在 SequentialExecutor 模式下,整个过程约 2 分钟内完成。切换到 CeleryExecutor 时,由于任务分发延迟,状态同步可能有 10-15 秒滞后,但逻辑完全一致。关键指标是查看
TaskInstance表:SELECT * FROM task_instance WHERE dag_id='adf_pipeline_orchestration' AND execution_date='2023-01-01' ORDER BY start_date;你会看到run_pipeline的try_number从 1 变成 2,state从success变成up_for_retry再变回success。
4.3 生产环境加固配置
上线前必须做三件事:
第一,添加重试保护。回调函数本身不能失败,否则整个机制崩塌。给mark_upstream_for_retry加一层装饰器:
import functools import time def robust_callback(max_retries=3, delay=1): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if attempt == max_retries - 1: raise time.sleep(delay * (2 ** attempt)) # 指数退避 return None return wrapper return decorator @robust_callback(max_retries=3, delay=1) def sensor_failure_callback(context): # 原有逻辑 pass第二,配置告警分级。不是所有上游重试都需要通知。在sensor_failure_callback里加入判断:
def sensor_failure_callback(context): ti = context['task_instance'] # 只有当上游重试次数超过阈值时才发严重告警 upstream_tis = get_upstream_task_instances(ti, upstream_task_ids=['run_pipeline']) if upstream_tis and upstream_tis[0].try_number >= 2: send_pagerduty_alert(f"ADF pipeline {ti.dag_id} retrying for the 3rd time") mark_upstream_for_retry(ti, upstream_task_ids=['run_pipeline'])第三,审计日志。在回调里记录关键操作:
logging.info( f"Grouped failure handler triggered for {ti.task_id} " f"in DAG {ti.dag_id}. Marked upstream tasks {upstream_task_ids} " f"for retry. Original state: {upstream_tis[0].state if upstream_tis else 'none'}" )5. 常见问题与实战排查技巧
5.1 问题速查表
| 问题现象 | 可能原因 | 排查命令/方法 | 解决方案 |
|---|---|---|---|
on_failure_callback完全没执行 | 1. 任务未配置retries > 02. DAG 未启用 3. 回调函数抛出异常未被捕获 | airflow dags list检查 DAG 状态airflow tasks list <dag_id>确认任务存在查看 airflow-webserver日志搜索callback | 确保retries=1以上;在回调外层加try/except并打日志 |
上游任务状态没变,还是success | 1.upstream_task_ids拼写错误2. 上游任务不在同一 execution_date3. 上游任务已处于 failed或up_for_retry | SELECT * FROM task_instance WHERE dag_id='<dag_id>' AND task_id IN ('run_pipeline') AND execution_date='<date>'; | 核对 task_id 大小写;确认execution_date是否一致;检查上游状态是否允许修改 |
重试后run_pipeline执行了两次,数据重复 | 1.run_pipeline本身不具备幂等性2. XCom 传递的 pipeline_run_id被缓存 | 查看run_pipeline的日志,确认是否生成了新 pipeline ID | 在run_pipeline中添加幂等性校验,比如用XCom.get_one()检查是否已存在 pipeline ID |
多个 Sensor 同时失败,run_pipeline被多次重试 | 并发修改未加锁,导致try_number只增1次但状态被多次设为up_for_retry | 查看task_instance表,try_number字段是否异常 | 确保mark_upstream_for_retry中使用with_for_update()行锁 |
| 回调执行慢,拖慢整个 DAG 调度 | 数据库查询未加索引,或upstream_depth过大 | EXPLAIN ANALYZE查看 SQL 执行计划 | 为task_instance(dag_id, execution_date, task_id)创建联合索引 |
5.2 独家避坑技巧
技巧一:用XCom传递上下文,避免硬编码
不要在回调里写死upstream_task_ids=['run_pipeline']。改成从context里动态读取:
def sensor_failure_callback(context): ti = context['task_instance'] # 从 XCom 读取上游 task_id,由 RunOperator 在成功时推送 upstream_task_id = ti.xcom_pull( task_ids=ti.task_id.replace('wait_for_', 'run_'), key='upstream_task_id' ) or ti.task_id.replace('wait_for_', 'run_') mark_upstream_for_retry(ti, upstream_task_ids=[upstream_task_id])然后在run_pipeline任务里加一行:
task_run = AzureDataFactoryRunPipelineOperator( # ... 其他参数 do_xcom_push=True, # 确保推送 XCom ) # 在 operator 的 execute 方法末尾,或用 PythonOperator 包装 def push_upstream_context(**context): context['ti'].xcom_push(key='upstream_task_id', value='run_pipeline') push_task = PythonOperator( task_id='push_upstream_context', python_callable=push_upstream_context, dag=dag, ) task_run >> push_task >> task_sensor这样,即使 DAG 重构,只要命名规范(wait_for_X对应run_X),回调就能自动适配。
技巧二:为 Sensor 添加“软失败”开关
有些场景下,Sensor 失败是预期行为(比如等待的文件还没生成)。这时不该重试整个组。加一个soft_fail参数:
class SmartSensor(AzureDataFactoryPipelineRunStatusSensor): template_fields = AzureDataFactoryPipelineRunStatusSensor.template_fields + ('soft_fail_on_timeout',) def __init__(self, soft_fail_on_timeout: bool = False, **kwargs): super().__init__(**kwargs) self.soft_fail_on_timeout = soft_fail_on_timeout def poke(self, context: Dict): try: return super().poke(context) except Exception as e: if self.soft_fail_on_timeout: # 记录为 skipped,不触发回调 logging.info(f"Soft fail enabled, skipping {self.task_id}") return True # 返回 True 表示成功,跳过后续逻辑 raise # 使用时 task_sensor = SmartSensor( task_id='wait_for_pipeline', soft_fail_on_timeout=True, # 这次失败不重试上游 # ... )技巧三:可视化状态同步链路
在 Airflow UI 的 DAG Graph View 里,很难看出哪个任务被哪个回调影响。我写了个小插件,自动在任务节点上加注释:
# plugins/grouped_failure_plugin.py from airflow.plugins_manager import AirflowPlugin from airflow.www import utils as wwwutils def add_grouped_failure_annotation(task, task_instance): if hasattr(task, 'on_failure_callback') and 'sensor_failure_callback' in str(task.on_failure_callback): upstream_ids = getattr(task, '_grouped_upstream_ids', []) if upstream_ids: return f"↑ Grouped with: {', '.join(upstream_ids)}" return "" wwwutils.task_instance_state_color = add_grouped_failure_annotation重启 Webserver 后,Graph View 里每个 Sensor 节点下方会显示↑ Grouped with: run_pipeline,一目了然。
6. 扩展应用:不止于 Sensor-Run 模式
这套机制的威力远不止解决 Sensor 问题。我把它抽象成一个通用模式,已成功应用于五类场景:
6.1 场景一:Kubernetes Pod 的“启动 + 健康检查”组
# KubernetesPodOperator 启动容器,CustomHealthCheckSensor 检查 readiness task_pod = KubernetesPodOperator( task_id='deploy_service', name='my-service', # ... ) task_health = CustomHealthCheckSensor( task_id='check_service_health', endpoint='http://my-service:8080/health', # 注入回调,失败时重试 deploy_service on_failure_callback=lambda ctx: mark_upstream_for_retry(ctx['task_instance'], ['deploy_service']), )6.2 场景二:Snowflake 作业的“提交 + 监控”组
# SnowflakeOperator 提交 SQL,SnowflakeQueryStatusSensor 监控执行状态 task_submit = SnowflakeOperator( task_id='submit_query', sql='INSERT INTO ...', ) task_monitor = SnowflakeQueryStatusSensor( task_id='monitor_query', query_id="{{ ti.xcom_pull(task_ids='submit_query') }}", on_failure_callback=lambda ctx: mark_upstream_for_retry(ctx['task_instance'], ['submit_query']), )6.3 场景三:跨 DAG 的强依赖协调
有时一个 DAG 的成功,依赖另一个 DAG 的某个任务。传统做法是用ExternalTaskSensor,但它只检查状态,不处理失败联动。我们可以:
# 在 DAG_A 的 sensor 中,不仅检查 DAG_B 的任务,还准备重试 DAG_B def cross_dag_failure_callback(context): ti = context['task_instance'] # 触发 DAG_B 的重跑 trigger_dag( dag_id='dag_b', execution_date=ti.execution_date, conf={'reason': 'upstream_failed_in_dag_a'} ) # 同时标记本 DAG 的上游任务重试 mark_upstream_for_retry(ti, upstream_task_ids=['prepare_data'])6.4 场景四:人工审批环节的“申请 + 审批”组
# EmailOperator 发送审批邮件,CustomApprovalSensor 等待人工回复 task_apply = EmailOperator( task_id='send_approval_request', to='manager@company.com', subject='Approve data export', ) task_approve = CustomApprovalSensor( task_id='wait_for_approval', email_thread_id="{{ ti.xcom_pull(task_ids='send_approval_request') }}", on_failure_callback=lambda ctx: mark_upstream_for_retry(ctx['task_instance'], ['send_approval_request']), )6.5 场景五:资源清理的“分配 + 释放”组
这是最容易被忽视的。比如用EC2StartInstanceOperator启动临时计算节点,用EC2StopInstanceOperator停止。如果 Stop 失败,节点可能一直运行产生费用。我们可以让 Stop 失败时,重试整个“启动-停止”流程:
task_start = EC2StartInstanceOperator( task_id='start_worker', instance_id='i-12345', ) task_stop = EC2StopInstanceOperator( task_id='stop_worker', instance_id='i-12345', # Stop 失败时,重试 Start(确保节点状态一致) on_failure_callback=lambda ctx: mark_upstream_for_retry(ctx['task_instance'], ['start_worker']), )我在实际项目中发现,超过 60% 的 Airflow 生产事故,根源不是调度器故障,而是任务间的状态契约没有被显式表达和强制执行。这篇方案的价值,不在于它多精巧,而在于它把隐含的业务逻辑,变成了可配置、可测试、可审计的代码。当你下次再看到一个“触发 + 监听”的任务对时,别再把它当成两个独立任务了。拿出这个模板,花五分钟配置好回调,你就已经把系统可靠性提升了一个数量级。最后分享个小技巧:把这个mark_upstream_for_retry函数封装成公司内部的AirflowUtils包,所有新 DAG 都强制要求在requirements.txt里声明,用 CI/CD 流水线扫描 DAG 代码,确保每个 Sensor 都配置了对应的回调——这才是真正的生产就绪。
