
使用 Apache Airflow 构建数据管道
核心概念
DAG(有向无环图):你的管道定义
Tasks(任务):工作单元(操作符)
Operators(操作符):任务类型的模板
Dependencies(依赖关系):任务执行顺序
Connections(连接):外部系统的存储凭据
Variables(变量):存储在 Airflow 中的配置
XComs(跨任务通信):在任务之间传递数据

基本 DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'user_activity_pipeline',
default_args=default_args,
description='Process daily user activity',
schedule='0 2 * * *', # 每天凌晨 2 点
start_date=datetime(2024, 1, 1),
catchup=False, # 不回溯错过的运行
tags=['user-activity', 'daily'],
max_active_runs=1, # 一次只运行一个实例
) as dag:
extract = PythonOperator(
task_id='extract_events',
python_callable=extract_user_events,
op_kwargs={'date': '{{ ds }}'}, # Jinja 模板
)
transform = PythonOperator(
task_id='transform_events',
python_callable=transform_events,
)
load = PostgresOperator(
task_id='load_to_warehouse',
postgres_conn_id='postgres_warehouse',
sql='sql/load_user_activity.sql',
parameters={'date': '{{ ds }}'},
)
validate = PythonOperator(
task_id='validate_results',
python_callable=validate_row_counts,
)
# 定义依赖关系
extract >> transform >> load >> validate
动态 DAG
from airflow.models import Variable
import json
# 从配置生成 DAG
configs = json.loads(Variable.get('pipeline_configs'))
for config in configs:
with DAG(f"pipeline_{config['name']}", ...) as dag:
tasks = []
for step in config['steps']:
task = PythonOperator(
task_id=step['id'],
python_callable=get_operator(step['type']),
op_kwargs=step.get('params', {}),
)
tasks.append(task)
# 链式任务
for i in range(len(tasks) - 1):
tasks[i] >> tasks[i + 1]

XComs(跨任务通信)
def extract_data(ti): # ti = TaskInstance
data = fetch_from_api()
# 推送到 XCom
ti.xcom_push(key='row_count', value=len(data))
return data # 返回值也会自动推送
def validate_data(ti):
row_count = ti.xcom_pull(task_ids='extract_data', key='row_count')
if row_count < 1000:
raise ValueError(f"Expected >1000 rows, got {row_count}")
# 在 Jinja 模板中:
# {{ ti.xcom_pull(task_ids='extract_data') }}
错误处理与告警
from airflow.utils.email import send_email
def on_failure_callback(context):
task = context['task_instance']
dag = context['dag']
exception = context['exception']
send_email(
to='data-team@company.com',
subject=f"Airflow FAILURE: {dag.dag_id}.{task.task_id}",
html_content=f"""
<h2>Task Failed</h2>
<p>DAG: {dag.dag_id}</p>
<p>Task: {task.task_id}</p>
<p>Run: {context['execution_date']}</p>
<p>Error: {str(exception)}</p>
"""
)
default_args = {
'on_failure_callback': on_failure_callback,
'on_retry_callback': on_retry_callback,
}

Kubernetes Executor
# docker-compose.yaml for local dev
services:
airflow-webserver:
image: apache/airflow:2.8.0
command: webserver
ports:
- "8080:8080"
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
airflow-scheduler:
image: apache/airflow:2.8.0
command: scheduler
depends_on:
- airflow-webserver
数据质量检查
from great_expectations.dataset import PandasDataset
def validate_data_quality(ti):
df = ti.xcom_pull(task_ids='transform_data')
dataset = PandasDataset(df)
# 期望
dataset.expect_column_values_to_not_be_null('user_id')
dataset.expect_column_values_to_be_between('amount', 0, 100000)
dataset.expect_column_values_to_be_unique('transaction_id')
results = dataset.validate()
if not results['success']:
failed = [r for r in results['results'] if not r['success']]
raise ValueError(f"Data quality failed: {failed}")
Airflow 是工作流编排的事实标准——从简单开始,根据需要增加复杂性。