from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta import pendulum # 引入时区库# 配置 DataX 路径 DATAX_HOME = "/opt/module/datax" JOB_PATH = "/opt/module/datax/job/test/ods_platform_order_detail_di.json"# 设置北京时区 local_tz = pendulum.timezone("Asia/Shanghai")default_args = {'owner': 'airflow','depends_on_past': False,# 使用北京时区的开始时间'start_date': datetime(2024, 1, 1, tzinfo=local_tz),# --- 邮件配置开始 ---'email': ['你的收件邮箱@example.com'], 'email_on_failure': True, 'email_on_retry': False, # --- 邮件配置结束 ---'retries': 1,'retry_delay': timedelta(minutes=5), }with DAG('ods_platform_order_detail_di',default_args=default_args,schedule_interval=None, # 手动触发模式catchup=False,tags=['datax', 'ods'] ) as dag:run_datax = BashOperator(task_id='run_datax_job',bash_command=f'python3.9 {DATAX_HOME}/bin/datax.py {JOB_PATH}',env={'PYTHONPATH': DATAX_HOME},# 成功也发邮件(你之前添加的参数)email_on_success=True )# 关键修正:确保这一行在 with 缩进内run_datax
import pendulum from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup from airflow.utils.email import send_email # 必须导入这个函数# 1. 定义成功回调函数 def send_success_email(context):"""context 是 Airflow 自动传入的任务上下文字典"""ti = context['task_instance']dag_id = ti.dag_idtask_id = ti.task_idexec_date = context['execution_date'].astimezone(pendulum.timezone("Asia/Shanghai"))subject = f"Airflow 任务成功通知: {dag_id}"# 构建 HTML 邮件内容html_content = f"""<h3>任务运行成功!</h3><p><b>DAG ID:</b> {dag_id}</p><p><b>Task ID:</b> {task_id}</p><p><b>执行时间:</b> {exec_date.format('YYYY-MM-DD HH:mm:ss')}</p><p><b>主机名:</b> {ti.hostname}</p><br><p>请登录 Airflow UI 查看详细日志。</p>"""# 发送邮件 send_email(to=['707924553@qq.com'],subject=subject,html_content=html_content)# 2. 设置时区和默认参数 local_tz = pendulum.timezone("Asia/Shanghai")default_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2024, 1, 1, tzinfo=local_tz),'email': ['707924553@qq.com'],'email_on_failure': True, # 失败依然使用系统默认报警'email_on_success': False, # 关闭系统默认的成功邮件,改用我们自定义的回调'retries': 1,'retry_delay': timedelta(minutes=5), }# 3. 定义 DAG with DAG('datax_complex_sync_v2',default_args=default_args,schedule_interval=None,catchup=False ) as dag:start = BashOperator(task_id='start_node', bash_command='echo "Start"')with TaskGroup("sync_group") as sync_group:with TaskGroup("doris_ops") as doris_ops:t1 = BashOperator(task_id='pre_check', bash_command='echo "Check"')t2 = BashOperator(task_id='datax_run', bash_command='echo "Run"')t1 >> t2 with TaskGroup("logging_ops") as logging_ops:t3 = BashOperator(task_id='log_start', bash_command='echo "Log Start"')t4 = BashOperator(task_id='log_end', bash_command='echo "Log End"')t3 >> t4doris_ops >> logging_ops# 在最后一个节点配置回调end = BashOperator(task_id='end_node',bash_command='echo "All Success"',on_success_callback=send_success_email # <--- 在这里绑定回调函数 )start >> sync_group >> end
