
数据管道测试策略:质量检查、Great Expectations 与 CDC 测试
测试数据管道与测试应用程序代码有根本区别。数据可能在语法上正确但语义上错误;分布会悄然偏移;模式变更会在没有警告的情况下破坏下游消费者。本指南涵盖了一个全面的策略:转换逻辑的单元测试、Spark DataFrame 测试、用于自动化数据剖析的 Great Expectations、模式契约测试、CDC 事件验证以及 Testcontainers 集成。
数据测试金字塔
- 单元测试:隔离测试单个转换函数
- 集成测试:针对真实容器化数据库测试管道组件
- 数据质量测试:断言静态或动态数据的实际属性
- 契约测试:验证数据生产者满足消费者模式期望
- 端到端测试:在代表性样本上运行完整管道

转换逻辑的单元测试
# transformations.py
import re
def normalize_phone(phone: str):
digits = re.sub(r'\D', '', phone or '')
if len(digits) == 10: return f'+1{digits}'
if len(digits) == 11 and digits[0] == '1': return f'+{digits}'
return None
def calculate_ltv(orders: list) -> float:
return sum(o['amount'] for o in orders if o['status'] == 'completed')
# test_transformations.py
import pytest
from transformations import normalize_phone, calculate_ltv
class TestNormalizePhone:
def test_10_digit(self): assert normalize_phone('5551234567') == '+15551234567'
def test_11_digit(self): assert normalize_phone('15551234567') == '+15551234567'
def test_formatted(self): assert normalize_phone('(555) 123-4567') == '+15551234567'
def test_invalid_none(self): assert normalize_phone('555') is None
def test_none_input(self): assert normalize_phone(None) is None
class TestCalculateLTV:
def test_only_completed(self):
orders = [{'status':'completed','amount':100.0},{'status':'cancelled','amount':50.0}]
assert calculate_ltv(orders) == 100.0
def test_empty(self): assert calculate_ltv([]) == 0.0
Spark 转换的单元测试
使用 pytest 配合本地 SparkSession。
# conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope='session')
def spark():
return SparkSession.builder.master('local[2]') \
.config('spark.sql.shuffle.partitions', '2').getOrCreate()
# test_spark_transforms.py
from pyspark.sql import functions as F
from my_transforms import deduplicate_events, enrich_with_user_tier
def test_deduplicate_events(spark):
data = [('e1','u1','2026-01-01'), ('e1','u1','2026-01-01'), ('e2','u2','2026-01-02')]
df = spark.createDataFrame(data, ['event_id','user_id','date'])
result = deduplicate_events(df)
assert result.count() == 2
assert result.filter(F.col('event_id') == 'e1').count() == 1
def test_enrich_with_user_tier(spark):
events = spark.createDataFrame([('e1','u1',100.0)], ['event_id','user_id','amount'])
users = spark.createDataFrame([('u1','gold')], ['user_id','tier'])
result = enrich_with_user_tier(events, users)
assert result.filter(F.col('user_id')=='u1').first()['tier'] == 'gold'

Great Expectations 用于数据剖析
Great Expectations (GX) 提供了一个定义和自动化数据质量检查的框架。
import great_expectations as gx
context = gx.get_context()
datasource = context.sources.add_pandas_filesystem(
name='orders_datasource', base_directory='./data/orders/')
asset = datasource.add_csv_asset(
name='orders', batching_regex=r'orders_(?P<date>\d{8})\.csv')
suite = context.add_expectation_suite('orders_quality_suite')
validator = context.get_validator(
batch_request=asset.build_batch_request(), expectation_suite=suite)
validator.expect_column_to_exist('order_id')
validator.expect_column_values_to_not_be_null('order_id')
validator.expect_column_values_to_be_unique('order_id')
validator.expect_column_values_to_be_in_set(
'status', {'pending', 'completed', 'cancelled', 'refunded'})
validator.expect_column_values_to_be_between('amount', min_value=0, max_value=100000)
validator.expect_column_median_to_be_between('amount', min_value=20, max_value=500)
validator.save_expectation_suite()
checkpoint = context.add_or_update_checkpoint(name='orders_cp', validator=validator)
results = checkpoint.run()
assert results.success, f'数据质量检查失败: {results}'
模式契约测试
模式契约防止生产者的破坏性变更悄然破坏消费者。
import jsonschema
ORDER_SCHEMA = {
'type': 'object',
'required': ['order_id', 'customer_id', 'amount', 'status'],
'properties': {
'order_id': {'type': 'string', 'minLength': 1},
'customer_id': {'type': 'integer', 'minimum': 1},
'amount': {'type': 'number', 'exclusiveMinimum': 0},
'status': {'type': 'string',
'enum': ['pending','completed','cancelled','refunded']},
},
'additionalProperties': True # 允许新增字段
}
def validate_order(order: dict) -> None:
jsonschema.validate(instance=order, schema=ORDER_SCHEMA)
# 测试:有效订单通过
validate_order({'order_id':'O1','customer_id':1,'amount':99.0,'status':'pending'})
# 测试:缺失字段引发 ValidationError
try:
validate_order({'order_id':'O2','customer_id':1,'status':'pending'})
assert False, '应该引发异常'
except jsonschema.ValidationError:
pass # 预期行为

测试 CDC 管道
def test_debezium_insert_event():
event = {'op': 'c', 'before': None,
'after': {'order_id': 'ORD-001', 'amount': 99.99, 'status': 'pending'},
'ts_ms': 1716854400000}
assert event['op'] == 'c'
assert event['before'] is None
assert event['after']['order_id'] is not None
def test_debezium_update_event():
event = {'op': 'u',
'before': {'order_id': 'ORD-001', 'status': 'pending'},
'after': {'order_id': 'ORD-001', 'status': 'completed'},
'ts_ms': 1716854460000}
assert event['before']['status'] != event['after']['status']
assert event['after']['status'] == 'completed'
def test_cdc_processor_applies_upsert():
processor = CDCProcessor(target_table='fct_orders')
events = [
{'op': 'c', 'after': {'order_id': 'O1', 'amount': 100.0}},
{'op': 'u', 'before': {'order_id': 'O1', 'amount': 100.0},
'after': {'order_id': 'O1', 'amount': 120.0}},
{'op': 'd', 'before': {'order_id': 'O2', 'amount': 50.0}, 'after': None},
]
result = processor.apply_events(events)
assert result['O1']['amount'] == 120.0
assert 'O2' not in result # 已删除
使用 Testcontainers 进行集成测试
import pytest
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope='module')
def postgres():
with PostgresContainer('postgres:15') as pg:
yield pg
def test_order_loader_integration(postgres):
from mylib.loaders import OrderLoader
loader = OrderLoader(dsn=postgres.get_connection_url())
loader.create_schema()
loader.load([{'order_id': 'T1', 'amount': 99.0, 'status': 'completed'}])
rows = loader.query("SELECT * FROM orders WHERE order_id = 'T1'")
assert len(rows) == 1
assert rows[0]['amount'] == 99.0
生产环境数据质量监控
def check_daily_order_count(date: str, df) -> None:
count = df.filter(f"order_date = '{date}'").count()
baseline_avg, baseline_std = get_historical_stats('daily_order_count', lookback_days=30)
z_score = abs(count - baseline_avg) / (baseline_std or 1)
if z_score > 3:
raise DataQualityError(
f'订单数 {count} 偏离基线 {z_score:.1f} 个标准差')
# 空值率监控
for col in ['order_id', 'customer_id', 'amount']:
null_pct = df.filter(f'{col} IS NULL').count() / df.count() * 100
if null_pct > 0.1:
alert(f'列 {col} 的空值率为 {null_pct:.2f}% — 阈值 0.1%')
结论
一个健壮的数据管道测试策略包括:转换逻辑的单元测试、使用本地 SparkSession 的 Spark DataFrame 测试、用于自动化数据剖析和漂移检测的 Great Expectations、用于强制执行生产者-消费者协议的 JSON Schema 契约、针对性的 CDC 事件测试,以及基于 Testcontainers 的集成测试。结合生产环境中的持续数据质量监控,这些层次能够在数据问题悄然破坏业务决策之前将其捕获。