正在加载,请稍候…

dbt 数据转换指南:模型、测试、增量构建与文档

面向分析工程的 dbt 全面指南:项目结构、模型分层、测试策略、增量模型、宏以及现代数据仓库的自动化文档。

dbt Data Transformation Guide: Models, Testing, Incremental Builds, and Document

dbt 数据转换指南:模型、测试、增量构建与文档

dbt(data build tool)通过将软件工程最佳实践——版本控制、测试、文档和模块化设计——引入 SQL 转换,彻底改变了分析工程。本指南涵盖项目组织、模型分层、测试策略、增量模型、宏以及面向现代云仓库的生产级 dbt 项目的自动化文档。

dbt 核心哲学

dbt 专注于 ELT 中的 T。它假设数据已加载到你的仓库(Snowflake、BigQuery、Redshift、DuckDB 等)中,并通过 SQL SELECT 语句处理转换。dbt 将这些语句编译为 DDL/DML 并在仓库中执行。

主要优势:

  • 每个转换都是 Git 中版本化的 SQL 文件
  • 内置测试框架可及早发现数据质量问题
  • 自动生成数据血缘图和文档站点
  • 增量处理可最大程度降低大型表的计算成本
  • Jinja 模板支持 DRY、可重用的 SQL 模式

dbt Data Transformation Guide: Models, Testing, Incremental Builds, and Document illustration

项目结构

一个组织良好的 dbt 项目遵循分层架构:

my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/           # 原始数据清洗,与源表 1:1
│   │   ├── _sources.yml
│   │   ├── _staging.yml
│   │   ├── stg_orders.sql
│   │   └── stg_customers.sql
│   ├── intermediate/      # 业务逻辑,多表连接
│   │   └── int_order_items_joined.sql
│   └── marts/             # 面向 BI 工具的最终分析模型
│       ├── core/
│       │   ├── dim_customers.sql
│       │   └── fct_orders.sql
│       └── finance/
│           └── fct_revenue.sql
├── tests/                 # 自定义单次测试(SQL 断言)
├── macros/                # 可重用的 Jinja 宏
├── seeds/                 # 静态 CSV 参考数据
└── snapshots/             # SCD Type 2 变更跟踪

暂存模型

暂存模型清洗并标准化原始源数据。它们将列重命名为一致的约定,转换类型,并应用简单过滤器。每个源表一个暂存模型是黄金法则——此处不包含业务逻辑。

-- models/staging/stg_orders.sql
with source as (
    select * from {{ source('ecommerce', 'raw_orders') }}
),

renamed as (
    select
        id                               as order_id,
        customer_id,
        lower(status)                    as status,
        amount                           as order_amount_usd,
        cast(created_at as timestamp)    as created_at,
        cast(updated_at as timestamp)    as updated_at
    from source
    where id is not null
)

select * from renamed

定义带有新鲜度检查的源:

# models/staging/_sources.yml
sources:
  - name: ecommerce
    database: raw
    schema: public
    freshness:
      warn_after:  {count: 24, period: hour}
      error_after: {count: 48, period: hour}
    tables:
      - name: raw_orders
        loaded_at_field: _loaded_at
      - name: raw_customers
        loaded_at_field: _loaded_at

中间模型

中间模型连接多个暂存模型并编码业务逻辑。它们通常不暴露给最终消费者,位于暂存层和集市层之间。

-- models/intermediate/int_order_items_joined.sql
with orders as (select * from {{ ref('stg_orders') }}),
order_items  as (select * from {{ ref('stg_order_items') }}),
products     as (select * from {{ ref('stg_products') }})

select
    oi.order_item_id,
    oi.order_id,
    o.customer_id,
    o.status            as order_status,
    oi.product_id,
    p.product_name,
    p.category,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price  as line_total,
    o.created_at        as order_created_at
from order_items oi
left join orders   o  using (order_id)
left join products p  using (product_id)

dbt Data Transformation Guide: Models, Testing, Incremental Builds, and Document illustration

集市模型

集市模型是 BI 工具消费的最终分析表。在此层配置物化方式和仓库特定设置。

-- models/marts/core/fct_orders.sql
{{
    config(
        materialized='table',
        cluster_by=['customer_id'],
        partition_by={'field': 'order_date', 'data_type': 'date', 'granularity': 'day'}
    )
}}

with orders as (select * from {{ ref('stg_orders') }}),
customers   as (select * from {{ ref('dim_customers') }}),
items_agg   as (
    select
        order_id,
        count(*)        as item_count,
        sum(line_total) as calculated_total
    from {{ ref('int_order_items_joined') }}
    group by 1
)

select
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.country,
    o.status,
    o.order_amount_usd,
    ia.item_count,
    ia.calculated_total,
    date(o.created_at)  as order_date,
    o.created_at
from orders o
left join customers c  using (customer_id)
left join items_agg ia using (order_id)

测试策略

dbt 提供两种测试类型:通用测试(在 YAML 中配置,可重用)和单次测试(自定义 SQL 断言,必须返回零行才能通过)。

通用测试在 schema YAML 中:

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled', 'refunded']
      - name: order_amount_usd
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000

单次测试用于复杂业务规则:

-- tests/assert_no_future_orders.sql
-- 如果任何订单有未来的创建日期则失败
select order_id
from {{ ref('fct_orders') }}
where order_date > current_date

dbt-expectations 用于统计性数据质量:

- name: order_amount_usd
  tests:
    - dbt_expectations.expect_column_mean_to_be_between:
        min_value: 50
        max_value: 500
    - dbt_expectations.expect_column_quantile_values_to_be_between:
        quantile: 0.99
        min_value: 0
        max_value: 10000

增量模型

增量模型仅处理新增或变更的行,从而显著降低每日增长的大表的计算成本。

-- models/marts/core/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        incremental_strategy='merge',
        on_schema_change='sync_all_columns'
    )
}}

select
    event_id,
    user_id,
    event_type,
    properties,
    occurred_at
from {{ source('events', 'raw_events') }}

{% if is_incremental() %}
    where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}

按仓库划分的增量策略:

  • append:仅插入新行(无去重)
  • merge:通过 unique_key 进行 Upsert——Snowflake、BigQuery、Spark
  • insert_overwrite:覆盖整个分区——在 BigQuery 和 Spark 上高效
  • delete+insert:删除匹配行然后插入——在 Redshift 上有用

dbt Data Transformation Guide: Models, Testing, Incremental Builds, and Document illustration

宏与可重用性

宏允许你使用 Jinja 模板编写 DRY SQL,封装常见模式。

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
{% endmacro %}

在模型中使用:

select
    {{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} as order_item_sk,
    {{ cents_to_dollars('amount_cents') }} as amount_usd
from {{ ref('stg_order_items') }}

自定义通用测试:

-- macros/test_is_positive.sql
{% test is_positive(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} is not null and {{ column_name }} <= 0
{% endtest %}

用于 SCD Type 2 的快照

快照跟踪维度数据随时间的变化,支持时间点查询。

-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True
    )
}}
select * from {{ source('ecommerce', 'raw_customers') }}
{% endsnapshot %}

运行 dbt snapshot 会添加 dbt_valid_fromdbt_valid_to 列,创建所有变更的完整历史。

文档自动化

dbt 根据 YAML 描述自动生成文档站点。

models:
  - name: fct_orders
    description: >
      每行一个订单。订单分析的主要事实表。
      每日从 raw_orders 源更新。
    columns:
      - name: order_id
        description: 来自电商平台的订单唯一标识符。
      - name: order_amount_usd
        description: 订单总价值(美元),含税和运费。
dbt docs generate   # 编译文档 + 血缘图
dbt docs serve --port 8080  # 启动交互式文档站点

生成的站点包括交互式 DAG 血缘图、列描述、测试覆盖状态和源新鲜度指示器。

用于更快管道的 Slim CI

在 CI 中仅运行修改的模型及其下游依赖:

# 获取生产清单进行比较
dbt run  --select state:modified+ --defer --state ./prod-manifest/
dbt test --select state:modified+

这大大缩短了 CI 时间——仅重新执行更改的模型及其子模型。

结论

dbt 为数据转换带来了工程严谨性:组织在暂存、中间和集市层的模块化 SQL;全面的测试框架;降低计算成本的增量模型;用于可重用逻辑的 Jinja 宏;用于缓慢变化维度的快照;以及保持团队一致的自动生成文档。无论是在 Snowflake、BigQuery 还是 Redshift 上运行,dbt 都是现代、可维护的分析栈的基础。