正在加载,请稍候…

数据管道测试策略:质量检查、Great Expectations、契约测试与CDC测试

全面的数据管道测试方法:转换逻辑单元测试、Spark DataFrame测试、Great Expectations数据剖析、模式契约测试、CDC事件验证以及Te

数据管道测试策略:质量检查、Great Expectations、契约测试与CDC测试

数据管道测试策略:质量检查、Great Expectations 与 CDC 测试

测试数据管道与测试应用程序代码有根本区别。数据可能在语法上正确但语义上错误;分布会悄然偏移;模式变更会在没有警告的情况下破坏下游消费者。本指南涵盖了一个全面的策略:转换逻辑的单元测试、Spark DataFrame 测试、用于自动化数据剖析的 Great Expectations、模式契约测试、CDC 事件验证以及 Testcontainers 集成。

数据测试金字塔

  1. 单元测试:隔离测试单个转换函数
  2. 集成测试:针对真实容器化数据库测试管道组件
  3. 数据质量测试:断言静态或动态数据的实际属性
  4. 契约测试:验证数据生产者满足消费者模式期望
  5. 端到端测试:在代表性样本上运行完整管道

数据管道测试策略:质量检查、Great Expectations、契约测试与CDC测试 插图

转换逻辑的单元测试

# transformations.py
import re

def normalize_phone(phone: str):
    digits = re.sub(r'\D', '', phone or '')
    if len(digits) == 10:                        return f'+1{digits}'
    if len(digits) == 11 and digits[0] == '1':  return f'+{digits}'
    return None

def calculate_ltv(orders: list) -> float:
    return sum(o['amount'] for o in orders if o['status'] == 'completed')
# test_transformations.py
import pytest
from transformations import normalize_phone, calculate_ltv

class TestNormalizePhone:
    def test_10_digit(self):       assert normalize_phone('5551234567')   == '+15551234567'
    def test_11_digit(self):       assert normalize_phone('15551234567')  == '+15551234567'
    def test_formatted(self):      assert normalize_phone('(555) 123-4567') == '+15551234567'
    def test_invalid_none(self):   assert normalize_phone('555') is None
    def test_none_input(self):     assert normalize_phone(None) is None

class TestCalculateLTV:
    def test_only_completed(self):
        orders = [{'status':'completed','amount':100.0},{'status':'cancelled','amount':50.0}]
        assert calculate_ltv(orders) == 100.0
    def test_empty(self):          assert calculate_ltv([]) == 0.0

Spark 转换的单元测试

使用 pytest 配合本地 SparkSession。

# conftest.py
import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='session')
def spark():
    return SparkSession.builder.master('local[2]') \
        .config('spark.sql.shuffle.partitions', '2').getOrCreate()

# test_spark_transforms.py
from pyspark.sql import functions as F
from my_transforms import deduplicate_events, enrich_with_user_tier

def test_deduplicate_events(spark):
    data = [('e1','u1','2026-01-01'), ('e1','u1','2026-01-01'), ('e2','u2','2026-01-02')]
    df = spark.createDataFrame(data, ['event_id','user_id','date'])
    result = deduplicate_events(df)
    assert result.count() == 2
    assert result.filter(F.col('event_id') == 'e1').count() == 1

def test_enrich_with_user_tier(spark):
    events = spark.createDataFrame([('e1','u1',100.0)], ['event_id','user_id','amount'])
    users  = spark.createDataFrame([('u1','gold')],      ['user_id','tier'])
    result = enrich_with_user_tier(events, users)
    assert result.filter(F.col('user_id')=='u1').first()['tier'] == 'gold'

数据管道测试策略:质量检查、Great Expectations、契约测试与CDC测试 插图

Great Expectations 用于数据剖析

Great Expectations (GX) 提供了一个定义和自动化数据质量检查的框架。

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas_filesystem(
    name='orders_datasource', base_directory='./data/orders/')
asset = datasource.add_csv_asset(
    name='orders', batching_regex=r'orders_(?P<date>\d{8})\.csv')

suite = context.add_expectation_suite('orders_quality_suite')
validator = context.get_validator(
    batch_request=asset.build_batch_request(), expectation_suite=suite)

validator.expect_column_to_exist('order_id')
validator.expect_column_values_to_not_be_null('order_id')
validator.expect_column_values_to_be_unique('order_id')
validator.expect_column_values_to_be_in_set(
    'status', {'pending', 'completed', 'cancelled', 'refunded'})
validator.expect_column_values_to_be_between('amount', min_value=0, max_value=100000)
validator.expect_column_median_to_be_between('amount', min_value=20, max_value=500)
validator.save_expectation_suite()

checkpoint = context.add_or_update_checkpoint(name='orders_cp', validator=validator)
results = checkpoint.run()
assert results.success, f'数据质量检查失败: {results}'

模式契约测试

模式契约防止生产者的破坏性变更悄然破坏消费者。

import jsonschema

ORDER_SCHEMA = {
    'type': 'object',
    'required': ['order_id', 'customer_id', 'amount', 'status'],
    'properties': {
        'order_id':    {'type': 'string',  'minLength': 1},
        'customer_id': {'type': 'integer', 'minimum': 1},
        'amount':      {'type': 'number',  'exclusiveMinimum': 0},
        'status':      {'type': 'string',
                        'enum': ['pending','completed','cancelled','refunded']},
    },
    'additionalProperties': True  # 允许新增字段
}

def validate_order(order: dict) -> None:
    jsonschema.validate(instance=order, schema=ORDER_SCHEMA)

# 测试:有效订单通过
validate_order({'order_id':'O1','customer_id':1,'amount':99.0,'status':'pending'})

# 测试:缺失字段引发 ValidationError
try:
    validate_order({'order_id':'O2','customer_id':1,'status':'pending'})
    assert False, '应该引发异常'
except jsonschema.ValidationError:
    pass  # 预期行为

数据管道测试策略:质量检查、Great Expectations、契约测试与CDC测试 插图

测试 CDC 管道

def test_debezium_insert_event():
    event = {'op': 'c', 'before': None,
             'after': {'order_id': 'ORD-001', 'amount': 99.99, 'status': 'pending'},
             'ts_ms': 1716854400000}
    assert event['op'] == 'c'
    assert event['before'] is None
    assert event['after']['order_id'] is not None

def test_debezium_update_event():
    event = {'op': 'u',
             'before': {'order_id': 'ORD-001', 'status': 'pending'},
             'after':  {'order_id': 'ORD-001', 'status': 'completed'},
             'ts_ms': 1716854460000}
    assert event['before']['status'] != event['after']['status']
    assert event['after']['status'] == 'completed'

def test_cdc_processor_applies_upsert():
    processor = CDCProcessor(target_table='fct_orders')
    events = [
        {'op': 'c', 'after':  {'order_id': 'O1', 'amount': 100.0}},
        {'op': 'u', 'before': {'order_id': 'O1', 'amount': 100.0},
                    'after':  {'order_id': 'O1', 'amount': 120.0}},
        {'op': 'd', 'before': {'order_id': 'O2', 'amount': 50.0}, 'after': None},
    ]
    result = processor.apply_events(events)
    assert result['O1']['amount'] == 120.0
    assert 'O2' not in result  # 已删除

使用 Testcontainers 进行集成测试

import pytest
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope='module')
def postgres():
    with PostgresContainer('postgres:15') as pg:
        yield pg

def test_order_loader_integration(postgres):
    from mylib.loaders import OrderLoader
    loader = OrderLoader(dsn=postgres.get_connection_url())
    loader.create_schema()
    loader.load([{'order_id': 'T1', 'amount': 99.0, 'status': 'completed'}])
    rows = loader.query("SELECT * FROM orders WHERE order_id = 'T1'")
    assert len(rows) == 1
    assert rows[0]['amount'] == 99.0

生产环境数据质量监控

def check_daily_order_count(date: str, df) -> None:
    count = df.filter(f"order_date = '{date}'").count()
    baseline_avg, baseline_std = get_historical_stats('daily_order_count', lookback_days=30)
    z_score = abs(count - baseline_avg) / (baseline_std or 1)
    if z_score > 3:
        raise DataQualityError(
            f'订单数 {count} 偏离基线 {z_score:.1f} 个标准差')

# 空值率监控
for col in ['order_id', 'customer_id', 'amount']:
    null_pct = df.filter(f'{col} IS NULL').count() / df.count() * 100
    if null_pct > 0.1:
        alert(f'列 {col} 的空值率为 {null_pct:.2f}% — 阈值 0.1%')

结论

一个健壮的数据管道测试策略包括:转换逻辑的单元测试、使用本地 SparkSession 的 Spark DataFrame 测试、用于自动化数据剖析和漂移检测的 Great Expectations、用于强制执行生产者-消费者协议的 JSON Schema 契约、针对性的 CDC 事件测试,以及基于 Testcontainers 的集成测试。结合生产环境中的持续数据质量监控,这些层次能够在数据问题悄然破坏业务决策之前将其捕获。