正在加载,请稍候…

Apache Airflow 生产环境指南:DAG 设计、动态任务、XCom 与 Celery Executor

Apache Airflow 生产环境实战指南:编写可靠 DAG、动态任务映射、XCom 任务间通信、Celery 执行器配置、告警与性能调优。

Apache Airflow in Production: DAG Design, Dynamic Tasks, XCom, and Celery Execut

Apache Airflow 生产环境指南:DAG 设计、动态任务、XCom 与 Celery Executor

Apache Airflow 是数据管道最广泛采用的工作流编排平台。其 Python 原生 DAG 编写、丰富的算子生态系统和强大的调度器使其适用于从夜间 ETL 作业到复杂 ML 训练管道的各种场景。本指南涵盖 DAG 设计原则、动态任务映射、XCom 通信、Celery 执行器以及生产运维最佳实践。

Airflow 架构

Airflow 由五个主要组件组成:

  • Scheduler:解析 DAG,调度任务实例,并将其提交给执行器
  • Executor:决定任务如何运行(LocalExecutor、CeleryExecutor、KubernetesExecutor)
  • Workers:执行任务实例(在 Celery/Kubernetes 模式下)
  • Metadata Database:存储 DAG 定义、任务状态、XCom 值和日志(推荐 PostgreSQL)
  • Web Server:提供 Airflow UI 用于监控和管理

Apache Airflow in Production: DAG Design, Dynamic Tasks, XCom, and Celery Execut illustration

TaskFlow API(Airflow 2.x)

TaskFlow API 使用 Python 装饰器实现清晰、可读的 DAG 编写:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=60),
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
}

@dag(
    dag_id='ecommerce_daily_etl',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'ecommerce'],
)
def ecommerce_daily_etl():

    @task()
    def extract_orders(ds: str) -> list:
        from mylib.db import query_postgres
        return query_postgres(
            "SELECT * FROM orders WHERE date(created_at) = %s", params=[ds])

    @task()
    def transform_orders(raw_orders: list) -> list:
        return [
            {'order_id': r['id'],
             'amount_usd': r['amount'] / 100.0,
             'status': r['status'].lower()}
            for r in raw_orders
        ]

    @task()
    def load_orders(transformed: list) -> int:
        from mylib.warehouse import bulk_upsert
        return bulk_upsert('fct_orders', transformed, key='order_id')

    raw = extract_orders()
    transformed = transform_orders(raw)
    load_orders(transformed)

dag_instance = ecommerce_daily_etl()

DAG 设计原则

幂等性:同一 execution_date 的每次 DAG 运行必须产生相同结果。使用 ds(逻辑日期)限定查询范围,切勿使用 NOW()

原子性:每个任务应只做一件事。避免既提取又加载的巨型任务——它们难以重试和调试。

避免顶层 I/O:模块级别的重导入或数据库连接会在每次调度器解析周期中执行,降低性能。

# 错误——每次解析周期都打开连接
import psycopg2
conn = psycopg2.connect(...)

# 正确——连接在任务函数内部
@task()
def my_task():
    import psycopg2
    conn = psycopg2.connect(...)

避免动态调度时逻辑:不要根据外部状态计算 schedule_interval。调度器需要稳定、可序列化的 DAG 结构。

动态任务映射(Airflow 2.3+)

动态任务映射根据上游数据在运行时创建任务——对于处理可变数量的文件、分区或 API 页面至关重要。

@dag(schedule_interval='@daily', start_date=days_ago(1), catchup=False)
def parallel_file_processing():

    @task()
    def list_files(ds: str) -> list[str]:
        import boto3
        s3 = boto3.client('s3')
        resp = s3.list_objects_v2(Bucket='data-lake', Prefix=f'raw/events/{ds}/')
        return [obj['Key'] for obj in resp.get('Contents', [])]

    @task()
    def process_file(s3_key: str) -> dict:
        rows = do_processing(s3_key)
        return {'key': s3_key, 'rows': rows}

    @task()
    def consolidate(results: list[dict]) -> None:
        total = sum(r['rows'] for r in results)
        print(f"Processed {total} total rows across {len(results)} files")

    files = list_files()
    processed = process_file.expand(s3_key=files)  # 创建 N 个并行任务
    consolidate(processed)

dag_instance = parallel_file_processing()

Apache Airflow in Production: DAG Design, Dynamic Tasks, XCom, and Celery Execut illustration

XCom 任务间通信

XCom(Cross-Communication)允许任务通过元数据库共享小值。

@task()
def get_record_count(ds: str) -> int:
    return query_count(ds)  # 返回整数

@task()
def validate_count(count: int) -> None:
    if count < 1000:
        raise ValueError(f"Expected >= 1000 records, got {count}")
    print(f"Validation passed: {count} records")

# TaskFlow 自动在装饰任务间传递 XCom 值
count = get_record_count()
validate_count(count)

使用传统算子的 XCom push/pull

from airflow.operators.python import PythonOperator

def push_value(**context):
    context['ti'].xcom_push(key='record_count', value=42000)

def pull_value(**context):
    count = context['ti'].xcom_pull(task_ids='push_task', key='record_count')
    print(f"Got {count} records")

重要:XCom 存储在元数据库中。保持值较小(< 1 MB)。对于大数据集,写入对象存储并通过 XCom 传递 S3 路径。

Celery Executor 设置

CeleryExecutor 是分布式、多 worker Airflow 部署的标准选择。

docker-compose 片段

services:
  airflow-scheduler:
    image: apache/airflow:2.9.0
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    depends_on: [postgres, redis]

  airflow-worker:
    image: apache/airflow:2.9.0
    command: celery worker
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
    deploy:
      replicas: 4  # 水平扩展

用于任务路由的 Worker 队列

@task(queue='gpu-workers')
def train_model(config: dict) -> str:
    # 此任务仅在 'gpu-workers' 队列的 worker 上运行
    ...

@task(queue='default')
def lightweight_transform(data: list) -> list:
    ...

启动监听特定队列的 worker:

airflow celery worker --queues gpu-workers

Sensors 和外部触发器

FileSensor——等待文件出现后再继续:

from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_daily_dump',
    filepath='/mnt/data/dumps/{{ ds }}/orders.csv',
    poke_interval=300,   # 每 5 分钟检查一次
    timeout=7200,        # 2 小时后超时失败
    mode='reschedule',   # 等待时释放 worker 槽位
)

ExternalTaskSensor——等待另一个 DAG 的任务完成:

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream_etl',
    external_dag_id='upstream_etl',
    external_task_id='load_complete',
    execution_delta=timedelta(hours=1),
    mode='reschedule',
    timeout=3600,
)

Apache Airflow in Production: DAG Design, Dynamic Tasks, XCom, and Celery Execut illustration

告警和 SLA 监控

任务级回调

def on_failure_callback(context):
    from mylib.slack import send_alert
    task = context['task_instance']
    send_alert(f"Task {task.task_id} in {task.dag_id} failed on {task.execution_date}")

@task(on_failure_callback=on_failure_callback)
def critical_load() -> None:
    ...

SLA 错过

@dag(
    sla_miss_callback=lambda dag, task_list, blocking_task_list, slas, blocking_tis: notify_sla_breach(slas),
    default_args={'sla': timedelta(hours=3)},
)
def my_dag():
    ...

性能调优

高 DAG 数量环境下的调度器调优

# airflow.cfg
[scheduler]
min_file_process_interval = 30       # 每 30 秒解析每个 DAG 文件
dag_dir_list_interval = 60           # 每 60 秒扫描 DAG 文件夹
max_dagruns_to_create_per_loop = 10
max_callbacks_per_loop = 20

[core]
parallelism = 64                     # 所有 DAG 的最大并发任务数
max_active_tasks_per_dag = 16

数据库优化:在 dag_runtask_instancexcom 表上添加索引。定期归档旧 DAG 运行:

from airflow.utils.db_cleanup import run_cleanup

run_cleanup(
    clean_before_timestamp=pendulum.now().subtract(days=90),
    table_names=['dag_run', 'task_instance', 'xcom', 'log'],
    dry_run=False,
)

生产检查清单

在将 DAG 推送到生产环境之前:

  1. catchup=False,除非有意回填
  2. 有状态管道设置 max_active_runs=1
  3. 所有任务都是幂等的(对同一日期重新运行安全)
  4. DAG 文件中没有顶层 I/O 或重导入
  5. 配置了重试逻辑和指数退避
  6. 设置了失败回调和 SLA 监控
  7. 大数据通过 S3 路径传递,而非直接通过 XCom
  8. Sensors 使用 mode='reschedule' 以释放 worker 槽位

结论

Apache Airflow 生产部署需要严谨的 DAG 设计、幂等任务、适当使用 XCom 传递小值、针对可变工作负载的动态任务映射、适当规模的 Celery executor 集群,以及通过回调和 SLA 错过告警实现的全面监控。这些实践可构建可靠、可观测的管道,并能在规模下安全运行。