正在加载,请稍候…

ClickHouse 分析实战:数十亿行数据的快速聚合

使用 ClickHouse 进行实时分析——涵盖表引擎、MergeTree 家族、物化视图、窗口函数、Kafka 集成以及 Node.js 查询。

ClickHouse 分析实战:数十亿行数据的快速聚合

为什么选择 ClickHouse?

ClickHouse 是一个列式 OLAP 数据库,每秒可处理数十亿行数据。它专为分析而构建,而非事务。

ClickHouse 分析实战:数十亿行数据的快速聚合 插图

核心架构

ClickHouse 按列存储数据,而非按行:

  • 仅读取所需列
  • 极高的压缩比
  • SIMD 向量化处理
  • 非常适合 SELECT 多行少列的场景

MergeTree 表引擎

-- 用于分析的主要表引擎
CREATE TABLE events (
  event_date Date,
  event_time DateTime,
  user_id UInt64,
  session_id String,
  event_type LowCardinality(String),
  properties JSON,
  revenue Decimal(10, 2)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)   -- 按月分区
ORDER BY (event_type, user_id, event_time)  -- 排序键
TTL event_date + INTERVAL 1 YEAR DELETE   -- 自动删除旧数据
SETTINGS index_granularity = 8192;

ClickHouse 分析实战:数十亿行数据的快速聚合 插图

ReplicatedMergeTree(高可用)

CREATE TABLE events ON CLUSTER '{cluster}'
(...)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_type, user_id, event_time);

用于预聚合的物化视图

-- 创建聚合视图(插入时更新)
CREATE MATERIALIZED VIEW events_daily_mv
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_type)
AS
SELECT
  event_date,
  event_type,
  uniqState(user_id) AS unique_users,
  countState() AS event_count,
  sumState(revenue) AS total_revenue
FROM events
GROUP BY event_date, event_type;

-- 查询视图(比扫描原始事件快 10000 倍)
SELECT
  event_date,
  event_type,
  uniqMerge(unique_users) AS unique_users,
  countMerge(event_count) AS events,
  sumMerge(total_revenue) AS revenue
FROM events_daily_mv
WHERE event_date >= today() - 30
GROUP BY event_date, event_type
ORDER BY event_date, revenue DESC;

ClickHouse 分析实战:数十亿行数据的快速聚合 插图

快速分析查询

-- 统计最近 7 天的唯一用户数
SELECT uniq(user_id) as unique_users
FROM events
WHERE event_date >= today() - 7;

-- 漏斗分析
SELECT
  countIf(step >= 1) AS step1,
  countIf(step >= 2) AS step2,
  countIf(step >= 3) AS step3
FROM (
  SELECT
    user_id,
    windowFunnel(86400)(  -- 24 小时窗口
      event_time,
      event_type = 'page_view',
      event_type = 'add_to_cart',
      event_type = 'purchase'
    ) AS step
  FROM events
  WHERE event_date >= today() - 30
  GROUP BY user_id
);

-- 留存队列
SELECT
  toStartOfWeek(first_seen) AS cohort,
  dateDiff('week', first_seen, activity_week) AS week_number,
  count(DISTINCT user_id) AS users
FROM (
  SELECT user_id, MIN(event_date) AS first_seen FROM events GROUP BY user_id
) first_events
JOIN events USING (user_id)
RENAME COLUMN event_date TO activity_week
GROUP BY cohort, week_number
ORDER BY cohort, week_number;

Kafka 集成

-- 直接从 Kafka 读取
CREATE TABLE kafka_events (
  event_type String,
  user_id UInt64,
  timestamp UInt64
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
         kafka_topic_list = 'events',
         kafka_group_name = 'clickhouse-consumer',
         kafka_format = 'JSONEachRow';

-- 将 Kafka 流物化到 ClickHouse 表
CREATE MATERIALIZED VIEW kafka_to_events TO events AS
SELECT
  toDate(fromUnixTimestamp64Milli(timestamp)) AS event_date,
  fromUnixTimestamp64Milli(timestamp) AS event_time,
  user_id,
  event_type
FROM kafka_events;

Node.js 客户端

import { createClient } from '@clickhouse/client'

const client = createClient({
  host: 'https://clickhouse.example.com:8443',
  username: 'default',
  password: process.env.CLICKHOUSE_PASSWORD,
  database: 'analytics',
})

// 流式读取大量结果
const resultSet = await client.query({
  query: 'SELECT event_date, count() FROM events GROUP BY event_date',
  format: 'JSONEachRow',
})

for await (const row of resultSet.stream()) {
  console.log(row)
}

// 批量插入
await client.insert({
  table: 'events',
  values: events,
  format: 'JSONEachRow',
})