
dbt 数据转换指南:模型、测试、增量构建与文档
dbt(data build tool)通过将软件工程最佳实践——版本控制、测试、文档和模块化设计——引入 SQL 转换,彻底改变了分析工程。本指南涵盖项目组织、模型分层、测试策略、增量模型、宏以及面向现代云仓库的生产级 dbt 项目的自动化文档。
dbt 核心哲学
dbt 专注于 ELT 中的 T。它假设数据已加载到你的仓库(Snowflake、BigQuery、Redshift、DuckDB 等)中,并通过 SQL SELECT 语句处理转换。dbt 将这些语句编译为 DDL/DML 并在仓库中执行。
主要优势:
- 每个转换都是 Git 中版本化的 SQL 文件
- 内置测试框架可及早发现数据质量问题
- 自动生成数据血缘图和文档站点
- 增量处理可最大程度降低大型表的计算成本
- Jinja 模板支持 DRY、可重用的 SQL 模式

项目结构
一个组织良好的 dbt 项目遵循分层架构:
my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/ # 原始数据清洗,与源表 1:1
│ │ ├── _sources.yml
│ │ ├── _staging.yml
│ │ ├── stg_orders.sql
│ │ └── stg_customers.sql
│ ├── intermediate/ # 业务逻辑,多表连接
│ │ └── int_order_items_joined.sql
│ └── marts/ # 面向 BI 工具的最终分析模型
│ ├── core/
│ │ ├── dim_customers.sql
│ │ └── fct_orders.sql
│ └── finance/
│ └── fct_revenue.sql
├── tests/ # 自定义单次测试(SQL 断言)
├── macros/ # 可重用的 Jinja 宏
├── seeds/ # 静态 CSV 参考数据
└── snapshots/ # SCD Type 2 变更跟踪
暂存模型
暂存模型清洗并标准化原始源数据。它们将列重命名为一致的约定,转换类型,并应用简单过滤器。每个源表一个暂存模型是黄金法则——此处不包含业务逻辑。
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('ecommerce', 'raw_orders') }}
),
renamed as (
select
id as order_id,
customer_id,
lower(status) as status,
amount as order_amount_usd,
cast(created_at as timestamp) as created_at,
cast(updated_at as timestamp) as updated_at
from source
where id is not null
)
select * from renamed
定义带有新鲜度检查的源:
# models/staging/_sources.yml
sources:
- name: ecommerce
database: raw
schema: public
freshness:
warn_after: {count: 24, period: hour}
error_after: {count: 48, period: hour}
tables:
- name: raw_orders
loaded_at_field: _loaded_at
- name: raw_customers
loaded_at_field: _loaded_at
中间模型
中间模型连接多个暂存模型并编码业务逻辑。它们通常不暴露给最终消费者,位于暂存层和集市层之间。
-- models/intermediate/int_order_items_joined.sql
with orders as (select * from {{ ref('stg_orders') }}),
order_items as (select * from {{ ref('stg_order_items') }}),
products as (select * from {{ ref('stg_products') }})
select
oi.order_item_id,
oi.order_id,
o.customer_id,
o.status as order_status,
oi.product_id,
p.product_name,
p.category,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price as line_total,
o.created_at as order_created_at
from order_items oi
left join orders o using (order_id)
left join products p using (product_id)

集市模型
集市模型是 BI 工具消费的最终分析表。在此层配置物化方式和仓库特定设置。
-- models/marts/core/fct_orders.sql
{{
config(
materialized='table',
cluster_by=['customer_id'],
partition_by={'field': 'order_date', 'data_type': 'date', 'granularity': 'day'}
)
}}
with orders as (select * from {{ ref('stg_orders') }}),
customers as (select * from {{ ref('dim_customers') }}),
items_agg as (
select
order_id,
count(*) as item_count,
sum(line_total) as calculated_total
from {{ ref('int_order_items_joined') }}
group by 1
)
select
o.order_id,
o.customer_id,
c.customer_name,
c.country,
o.status,
o.order_amount_usd,
ia.item_count,
ia.calculated_total,
date(o.created_at) as order_date,
o.created_at
from orders o
left join customers c using (customer_id)
left join items_agg ia using (order_id)
测试策略
dbt 提供两种测试类型:通用测试(在 YAML 中配置,可重用)和单次测试(自定义 SQL 断言,必须返回零行才能通过)。
通用测试在 schema YAML 中:
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: order_amount_usd
tests:
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
单次测试用于复杂业务规则:
-- tests/assert_no_future_orders.sql
-- 如果任何订单有未来的创建日期则失败
select order_id
from {{ ref('fct_orders') }}
where order_date > current_date
dbt-expectations 用于统计性数据质量:
- name: order_amount_usd
tests:
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 500
- dbt_expectations.expect_column_quantile_values_to_be_between:
quantile: 0.99
min_value: 0
max_value: 10000
增量模型
增量模型仅处理新增或变更的行,从而显著降低每日增长的大表的计算成本。
-- models/marts/core/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='sync_all_columns'
)
}}
select
event_id,
user_id,
event_type,
properties,
occurred_at
from {{ source('events', 'raw_events') }}
{% if is_incremental() %}
where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}
按仓库划分的增量策略:
append:仅插入新行(无去重)merge:通过unique_key进行 Upsert——Snowflake、BigQuery、Sparkinsert_overwrite:覆盖整个分区——在 BigQuery 和 Spark 上高效delete+insert:删除匹配行然后插入——在 Redshift 上有用

宏与可重用性
宏允许你使用 Jinja 模板编写 DRY SQL,封装常见模式。
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
{% endmacro %}
在模型中使用:
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} as order_item_sk,
{{ cents_to_dollars('amount_cents') }} as amount_usd
from {{ ref('stg_order_items') }}
自定义通用测试:
-- macros/test_is_positive.sql
{% test is_positive(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} is not null and {{ column_name }} <= 0
{% endtest %}
用于 SCD Type 2 的快照
快照跟踪维度数据随时间的变化,支持时间点查询。
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select * from {{ source('ecommerce', 'raw_customers') }}
{% endsnapshot %}
运行 dbt snapshot 会添加 dbt_valid_from 和 dbt_valid_to 列,创建所有变更的完整历史。
文档自动化
dbt 根据 YAML 描述自动生成文档站点。
models:
- name: fct_orders
description: >
每行一个订单。订单分析的主要事实表。
每日从 raw_orders 源更新。
columns:
- name: order_id
description: 来自电商平台的订单唯一标识符。
- name: order_amount_usd
description: 订单总价值(美元),含税和运费。
dbt docs generate # 编译文档 + 血缘图
dbt docs serve --port 8080 # 启动交互式文档站点
生成的站点包括交互式 DAG 血缘图、列描述、测试覆盖状态和源新鲜度指示器。
用于更快管道的 Slim CI
在 CI 中仅运行修改的模型及其下游依赖:
# 获取生产清单进行比较
dbt run --select state:modified+ --defer --state ./prod-manifest/
dbt test --select state:modified+
这大大缩短了 CI 时间——仅重新执行更改的模型及其子模型。
结论
dbt 为数据转换带来了工程严谨性:组织在暂存、中间和集市层的模块化 SQL;全面的测试框架;降低计算成本的增量模型;用于可重用逻辑的 Jinja 宏;用于缓慢变化维度的快照;以及保持团队一致的自动生成文档。无论是在 Snowflake、BigQuery 还是 Redshift 上运行,dbt 都是现代、可维护的分析栈的基础。