
Apache Airflow 生产环境指南:DAG 设计、动态任务、XCom 与 Celery Executor
Apache Airflow 是数据管道最广泛采用的工作流编排平台。其 Python 原生 DAG 编写、丰富的算子生态系统和强大的调度器使其适用于从夜间 ETL 作业到复杂 ML 训练管道的各种场景。本指南涵盖 DAG 设计原则、动态任务映射、XCom 通信、Celery 执行器以及生产运维最佳实践。
Airflow 架构
Airflow 由五个主要组件组成:
- Scheduler:解析 DAG,调度任务实例,并将其提交给执行器
- Executor:决定任务如何运行(LocalExecutor、CeleryExecutor、KubernetesExecutor)
- Workers:执行任务实例(在 Celery/Kubernetes 模式下)
- Metadata Database:存储 DAG 定义、任务状态、XCom 值和日志(推荐 PostgreSQL)
- Web Server:提供 Airflow UI 用于监控和管理
TaskFlow API(Airflow 2.x)
TaskFlow API 使用 Python 装饰器实现清晰、可读的 DAG 编写:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=60),
'email_on_failure': True,
'email': ['data-alerts@company.com'],
}
@dag(
dag_id='ecommerce_daily_etl',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=['etl', 'ecommerce'],
)
def ecommerce_daily_etl():
@task()
def extract_orders(ds: str) -> list:
from mylib.db import query_postgres
return query_postgres(
"SELECT * FROM orders WHERE date(created_at) = %s", params=[ds])
@task()
def transform_orders(raw_orders: list) -> list:
return [
{'order_id': r['id'],
'amount_usd': r['amount'] / 100.0,
'status': r['status'].lower()}
for r in raw_orders
]
@task()
def load_orders(transformed: list) -> int:
from mylib.warehouse import bulk_upsert
return bulk_upsert('fct_orders', transformed, key='order_id')
raw = extract_orders()
transformed = transform_orders(raw)
load_orders(transformed)
dag_instance = ecommerce_daily_etl()
DAG 设计原则
幂等性:同一 execution_date 的每次 DAG 运行必须产生相同结果。使用 ds(逻辑日期)限定查询范围,切勿使用 NOW()。
原子性:每个任务应只做一件事。避免既提取又加载的巨型任务——它们难以重试和调试。
避免顶层 I/O:模块级别的重导入或数据库连接会在每次调度器解析周期中执行,降低性能。
# 错误——每次解析周期都打开连接
import psycopg2
conn = psycopg2.connect(...)
# 正确——连接在任务函数内部
@task()
def my_task():
import psycopg2
conn = psycopg2.connect(...)
避免动态调度时逻辑:不要根据外部状态计算 schedule_interval。调度器需要稳定、可序列化的 DAG 结构。
动态任务映射(Airflow 2.3+)
动态任务映射根据上游数据在运行时创建任务——对于处理可变数量的文件、分区或 API 页面至关重要。
@dag(schedule_interval='@daily', start_date=days_ago(1), catchup=False)
def parallel_file_processing():
@task()
def list_files(ds: str) -> list[str]:
import boto3
s3 = boto3.client('s3')
resp = s3.list_objects_v2(Bucket='data-lake', Prefix=f'raw/events/{ds}/')
return [obj['Key'] for obj in resp.get('Contents', [])]
@task()
def process_file(s3_key: str) -> dict:
rows = do_processing(s3_key)
return {'key': s3_key, 'rows': rows}
@task()
def consolidate(results: list[dict]) -> None:
total = sum(r['rows'] for r in results)
print(f"Processed {total} total rows across {len(results)} files")
files = list_files()
processed = process_file.expand(s3_key=files) # 创建 N 个并行任务
consolidate(processed)
dag_instance = parallel_file_processing()
XCom 任务间通信
XCom(Cross-Communication)允许任务通过元数据库共享小值。
@task()
def get_record_count(ds: str) -> int:
return query_count(ds) # 返回整数
@task()
def validate_count(count: int) -> None:
if count < 1000:
raise ValueError(f"Expected >= 1000 records, got {count}")
print(f"Validation passed: {count} records")
# TaskFlow 自动在装饰任务间传递 XCom 值
count = get_record_count()
validate_count(count)
使用传统算子的 XCom push/pull:
from airflow.operators.python import PythonOperator
def push_value(**context):
context['ti'].xcom_push(key='record_count', value=42000)
def pull_value(**context):
count = context['ti'].xcom_pull(task_ids='push_task', key='record_count')
print(f"Got {count} records")
重要:XCom 存储在元数据库中。保持值较小(< 1 MB)。对于大数据集,写入对象存储并通过 XCom 传递 S3 路径。
Celery Executor 设置
CeleryExecutor 是分布式、多 worker Airflow 部署的标准选择。
docker-compose 片段:
services:
airflow-scheduler:
image: apache/airflow:2.9.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
depends_on: [postgres, redis]
airflow-worker:
image: apache/airflow:2.9.0
command: celery worker
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
deploy:
replicas: 4 # 水平扩展
用于任务路由的 Worker 队列:
@task(queue='gpu-workers')
def train_model(config: dict) -> str:
# 此任务仅在 'gpu-workers' 队列的 worker 上运行
...
@task(queue='default')
def lightweight_transform(data: list) -> list:
...
启动监听特定队列的 worker:
airflow celery worker --queues gpu-workers
Sensors 和外部触发器
FileSensor——等待文件出现后再继续:
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_daily_dump',
filepath='/mnt/data/dumps/{{ ds }}/orders.csv',
poke_interval=300, # 每 5 分钟检查一次
timeout=7200, # 2 小时后超时失败
mode='reschedule', # 等待时释放 worker 槽位
)
ExternalTaskSensor——等待另一个 DAG 的任务完成:
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream_etl',
external_dag_id='upstream_etl',
external_task_id='load_complete',
execution_delta=timedelta(hours=1),
mode='reschedule',
timeout=3600,
)
告警和 SLA 监控
任务级回调:
def on_failure_callback(context):
from mylib.slack import send_alert
task = context['task_instance']
send_alert(f"Task {task.task_id} in {task.dag_id} failed on {task.execution_date}")
@task(on_failure_callback=on_failure_callback)
def critical_load() -> None:
...
SLA 错过:
@dag(
sla_miss_callback=lambda dag, task_list, blocking_task_list, slas, blocking_tis: notify_sla_breach(slas),
default_args={'sla': timedelta(hours=3)},
)
def my_dag():
...
性能调优
高 DAG 数量环境下的调度器调优:
# airflow.cfg
[scheduler]
min_file_process_interval = 30 # 每 30 秒解析每个 DAG 文件
dag_dir_list_interval = 60 # 每 60 秒扫描 DAG 文件夹
max_dagruns_to_create_per_loop = 10
max_callbacks_per_loop = 20
[core]
parallelism = 64 # 所有 DAG 的最大并发任务数
max_active_tasks_per_dag = 16
数据库优化:在 dag_run、task_instance 和 xcom 表上添加索引。定期归档旧 DAG 运行:
from airflow.utils.db_cleanup import run_cleanup
run_cleanup(
clean_before_timestamp=pendulum.now().subtract(days=90),
table_names=['dag_run', 'task_instance', 'xcom', 'log'],
dry_run=False,
)
生产检查清单
在将 DAG 推送到生产环境之前:
catchup=False,除非有意回填- 有状态管道设置
max_active_runs=1 - 所有任务都是幂等的(对同一日期重新运行安全)
- DAG 文件中没有顶层 I/O 或重导入
- 配置了重试逻辑和指数退避
- 设置了失败回调和 SLA 监控
- 大数据通过 S3 路径传递,而非直接通过 XCom
- Sensors 使用
mode='reschedule'以释放 worker 槽位
结论
Apache Airflow 生产部署需要严谨的 DAG 设计、幂等任务、适当使用 XCom 传递小值、针对可变工作负载的动态任务映射、适当规模的 Celery executor 集群,以及通过回调和 SLA 错过告警实现的全面监控。这些实践可构建可靠、可观测的管道,并能在规模下安全运行。