正在加载,请稍候…

使用 Apache Airflow 构建数据管道:DAG、操作符与生产实践

学习使用 Apache Airflow 构建和管理数据管道,涵盖 DAG 设计、操作符、依赖关系、错误处理、监控以及使用 Celery 和 Kubernetes

使用 Apache Airflow 构建数据管道:DAG、操作符与生产实践

使用 Apache Airflow 构建数据管道

核心概念

DAG(有向无环图):你的管道定义
Tasks(任务):工作单元(操作符)
Operators(操作符):任务类型的模板
Dependencies(依赖关系):任务执行顺序
Connections(连接):外部系统的存储凭据
Variables(变量):存储在 Airflow 中的配置
XComs(跨任务通信):在任务之间传递数据

使用 Apache Airflow 构建数据管道:DAG、操作符与生产实践 插图

基本 DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'user_activity_pipeline',
    default_args=default_args,
    description='Process daily user activity',
    schedule='0 2 * * *',     # 每天凌晨 2 点
    start_date=datetime(2024, 1, 1),
    catchup=False,             # 不回溯错过的运行
    tags=['user-activity', 'daily'],
    max_active_runs=1,         # 一次只运行一个实例
) as dag:

    extract = PythonOperator(
        task_id='extract_events',
        python_callable=extract_user_events,
        op_kwargs={'date': '{{ ds }}'},  # Jinja 模板
    )

    transform = PythonOperator(
        task_id='transform_events',
        python_callable=transform_events,
    )

    load = PostgresOperator(
        task_id='load_to_warehouse',
        postgres_conn_id='postgres_warehouse',
        sql='sql/load_user_activity.sql',
        parameters={'date': '{{ ds }}'},
    )

    validate = PythonOperator(
        task_id='validate_results',
        python_callable=validate_row_counts,
    )

    # 定义依赖关系
    extract >> transform >> load >> validate

动态 DAG

from airflow.models import Variable
import json

# 从配置生成 DAG
configs = json.loads(Variable.get('pipeline_configs'))

for config in configs:
    with DAG(f"pipeline_{config['name']}", ...) as dag:
        tasks = []
        for step in config['steps']:
            task = PythonOperator(
                task_id=step['id'],
                python_callable=get_operator(step['type']),
                op_kwargs=step.get('params', {}),
            )
            tasks.append(task)

        # 链式任务
        for i in range(len(tasks) - 1):
            tasks[i] >> tasks[i + 1]

使用 Apache Airflow 构建数据管道:DAG、操作符与生产实践 插图

XComs(跨任务通信)

def extract_data(ti):  # ti = TaskInstance
    data = fetch_from_api()
    # 推送到 XCom
    ti.xcom_push(key='row_count', value=len(data))
    return data  # 返回值也会自动推送

def validate_data(ti):
    row_count = ti.xcom_pull(task_ids='extract_data', key='row_count')
    if row_count < 1000:
        raise ValueError(f"Expected >1000 rows, got {row_count}")

# 在 Jinja 模板中:
# {{ ti.xcom_pull(task_ids='extract_data') }}

错误处理与告警

from airflow.utils.email import send_email

def on_failure_callback(context):
    task = context['task_instance']
    dag = context['dag']
    exception = context['exception']

    send_email(
        to='data-team@company.com',
        subject=f"Airflow FAILURE: {dag.dag_id}.{task.task_id}",
        html_content=f"""
        <h2>Task Failed</h2>
        <p>DAG: {dag.dag_id}</p>
        <p>Task: {task.task_id}</p>
        <p>Run: {context['execution_date']}</p>
        <p>Error: {str(exception)}</p>
        """
    )

default_args = {
    'on_failure_callback': on_failure_callback,
    'on_retry_callback': on_retry_callback,
}

使用 Apache Airflow 构建数据管道:DAG、操作符与生产实践 插图

Kubernetes Executor

# docker-compose.yaml for local dev
services:
  airflow-webserver:
    image: apache/airflow:2.8.0
    command: webserver
    ports:
      - "8080:8080"
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow

  airflow-scheduler:
    image: apache/airflow:2.8.0
    command: scheduler
    depends_on:
      - airflow-webserver

数据质量检查

from great_expectations.dataset import PandasDataset

def validate_data_quality(ti):
    df = ti.xcom_pull(task_ids='transform_data')
    dataset = PandasDataset(df)

    # 期望
    dataset.expect_column_values_to_not_be_null('user_id')
    dataset.expect_column_values_to_be_between('amount', 0, 100000)
    dataset.expect_column_values_to_be_unique('transaction_id')

    results = dataset.validate()
    if not results['success']:
        failed = [r for r in results['results'] if not r['success']]
        raise ValueError(f"Data quality failed: {failed}")

Airflow 是工作流编排的事实标准——从简单开始,根据需要增加复杂性。