正在加载,请稍候…

TimescaleDB:PostgreSQL 中的时序数据

利用 TimescaleDB 处理 IoT 和指标数据——超表、连续聚合、压缩策略、数据保留,以及与 InfluxDB 的对比。

TimescaleDB:PostgreSQL 中的时序数据

为什么选择 TimescaleDB?

TimescaleDB 是具备时序数据超能力的 PostgreSQL。您可以使用熟悉的 SQL、现有工具,并在时序数据上获得 100 倍的查询性能。

TimescaleDB:PostgreSQL 中的时序数据 插图

设置

-- 启用扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- 创建普通表
CREATE TABLE metrics (
  time        TIMESTAMPTZ NOT NULL,
  device_id   TEXT NOT NULL,
  metric_name TEXT NOT NULL,
  value       DOUBLE PRECISION
);

-- 转换为超表(按时间自动分区)
SELECT create_hypertable('metrics', 'time',
  chunk_time_interval => INTERVAL '1 day',
  if_not_exists => TRUE
);

-- 为常见查询添加复合索引
CREATE INDEX idx_metrics_device_time ON metrics (device_id, time DESC);

插入与查询

-- 插入(与普通 PostgreSQL 相同)
INSERT INTO metrics (time, device_id, metric_name, value)
VALUES
  (NOW(), 'sensor-001', 'temperature', 23.5),
  (NOW(), 'sensor-001', 'humidity', 65.2),
  (NOW(), 'sensor-002', 'temperature', 21.1);

-- 时间桶聚合
SELECT
  time_bucket('1 hour', time) AS hour,
  device_id,
  AVG(value) AS avg_temp,
  MAX(value) AS max_temp,
  MIN(value) AS min_temp
FROM metrics
WHERE metric_name = 'temperature'
  AND time > NOW() - INTERVAL '24 hours'
GROUP BY hour, device_id
ORDER BY hour DESC, device_id;

-- 间隙填充(为缺失的时间间隔插入 null)
SELECT
  time_bucket_gapfill('1 hour', time) AS hour,
  device_id,
  LOCF(AVG(value)) AS temperature  -- 向前填充最后观测值
FROM metrics
WHERE metric_name = 'temperature'
  AND time > NOW() - INTERVAL '7 days'
  AND device_id = 'sensor-001'
GROUP BY hour, device_id
ORDER BY hour;

TimescaleDB:PostgreSQL 中的时序数据 插图

连续聚合

-- 预计算每小时平均值(自动刷新)
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 hour', time) AS hour,
  device_id,
  metric_name,
  AVG(value) AS avg_value,
  MAX(value) AS max_value,
  MIN(value) AS min_value,
  COUNT(*) AS sample_count
FROM metrics
GROUP BY hour, device_id, metric_name
WITH NO DATA;

-- 设置刷新策略
SELECT add_continuous_aggregate_policy('metrics_hourly',
  start_offset => INTERVAL '3 hours',
  end_offset   => INTERVAL '1 hour',
  schedule_interval => INTERVAL '30 minutes'
);

-- 查询视图(超快!)
SELECT hour, avg_value, max_value
FROM metrics_hourly
WHERE device_id = 'sensor-001'
  AND metric_name = 'temperature'
  AND hour > NOW() - INTERVAL '7 days'
ORDER BY hour DESC;

压缩策略

-- 压缩超过 7 天的数据块(通常压缩率 90% 以上)
SELECT add_compression_policy('metrics',
  compress_after => INTERVAL '7 days'
);

-- 监控压缩
SELECT
  chunk_name,
  before_compression_total_bytes,
  after_compression_total_bytes,
  round(after_compression_total_bytes::numeric / before_compression_total_bytes * 100, 1) AS pct
FROM chunk_compression_stats('metrics')
ORDER BY chunk_name;

TimescaleDB:PostgreSQL 中的时序数据 插图

数据保留

-- 自动删除超过 90 天的数据
SELECT add_retention_policy('metrics', drop_after => INTERVAL '90 days');

Node.js 与 TimescaleDB

import { Pool } from 'pg'

const pool = new Pool({ connectionString: process.env.DATABASE_URL })

async function insertMetrics(readings: MetricReading[]) {
  const values = readings.map(r => `('${r.time}', '${r.deviceId}', '${r.metric}', ${r.value})`).join(',')
  await pool.query(`INSERT INTO metrics (time, device_id, metric_name, value) VALUES ${values}`)
}

async function getDeviceMetrics(deviceId: string, hours: number) {
  const result = await pool.query(`
    SELECT
      time_bucket('5 minutes', time) AS bucket,
      AVG(value) AS avg_value,
      MAX(value) AS max_value
    FROM metrics
    WHERE device_id = $1
      AND time > NOW() - $2 * INTERVAL '1 hour'
      AND metric_name = 'temperature'
    GROUP BY bucket
    ORDER BY bucket DESC
  `, [deviceId, hours])
  return result.rows
}

TimescaleDB vs InfluxDB

特性 TimescaleDB InfluxDB
查询语言 SQL InfluxQL/Flux
现有 PG 工具
连接操作 完整 SQL 连接 有限
压缩率 ~90% ~95%
写入速度 ~100 万/秒 ~150 万/秒
生态系统 PostgreSQL 专用构建