
Apache Flink 流处理:事件时间、窗口、状态与精确一次语义
Apache Flink 已成为最强大的有状态流处理框架之一,提供真正的事件时间语义、表达力丰富的窗口操作、容错的状态管理以及端到端的精确一次交付保证。本指南涵盖生产级 Flink 部署的核心概念和实际实现模式。
Flink 流处理模型
Flink 将每个计算视为流。批处理是有界数据集上的流处理特例。关键 API 包括:
- DataStream API:Java/Scala/Python 中的低级流转换
- Table API / Flink SQL:流上的声明式关系操作
- Stateful Functions:事件驱动的有状态微服务
Flink 应用程序表示为逻辑数据流图:source → transformation → sink。

事件时间 vs 处理时间 vs 摄入时间
选择正确的时间特征是基础。
处理时间:事件被处理时机器上的时间。最简单,延迟最低,但非确定性——重放会产生不同结果。
摄入时间:事件进入 Flink 的时间。比处理时间更稳定,但重放时仍非确定性。
事件时间:事件本身嵌入的时间戳。确定性,对乱序数据正确,但需要 watermark 处理迟到数据。
// Java DataStream API with event-time watermarks
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env
.fromSource(kafkaSource, WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(5)),
"Kafka Events");
Watermark
Watermark 向 Flink 发出信号,表明所有时间戳等于或早于该 watermark 值的事件都已收到。它们控制窗口何时触发以及何时将迟到数据视为真正迟到。
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 30_000L; // 30 seconds in ms
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
Watermark 传播:在多 source 拓扑中,所有并行子任务的最小 watermark 推进全局 watermark。空闲 source 可能无限期阻止 watermark 前进——withIdleness 在指定持续时间后将 source 标记为空闲,防止停滞。
窗口操作
窗口将无限流分割为有限块以进行聚合。
滚动窗口——不重叠,固定大小:
events.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregate(), new WindowResultFunction());
滑动窗口——重叠:
events.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
.sum("revenue");
会话窗口——基于间隔,大小可变;非常适合用户会话分析:
events.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new SessionAnalyzer());
迟到数据处理,使用侧输出:
OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
SingleOutputStreamOperator<AggResult> mainStream = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sideOutputLateData(lateOutputTag)
.allowedLateness(Time.minutes(1))
.aggregate(new CountAggregate());
DataStream<Event> lateStream = mainStream.getSideOutput(lateOutputTag);
// Route late data to a correction topic or dead-letter queue
KeyedProcessFunction 和定时器
KeyedProcessFunction 提供对事件处理、状态和定时器的完全控制——Flink 中用于复杂事件处理的最强大 API。
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private ValueState<Boolean> flagState;
private ValueState<Long> timerState;
@Override
public void open(Configuration params) {
flagState = getRuntimeContext().getState(
new ValueStateDescriptor<>("flag", Types.BOOLEAN));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Types.LONG));
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
Boolean lastWasSmall = flagState.value();
if (lastWasSmall != null && lastWasSmall && tx.getAmount() > 500) {
out.collect(new Alert(tx.getAccountId()));
cleanUp(ctx);
}
if (tx.getAmount() < 1.00) {
flagState.update(true);
long timer = ctx.timerService().currentProcessingTime() + 60_000L;
ctx.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long ts, OnTimerContext ctx, Collector<Alert> out) throws Exception {
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
Long timer = timerState.value();
if (timer != null) ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
flagState.clear();
}
}

状态管理
Flink 提供多种状态后端:
- HashMapStateBackend:内存中,快速,受限于 JVM 堆——适用于小状态
- EmbeddedRocksDBStateBackend:通过 RocksDB 的磁盘支持,支持 TB 级状态,点查询慢约 10 倍,但支持增量检查点
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental checkpoints
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/my-job/");
状态类型:
// ValueState — single value per key
ValueState<UserProfile> profileState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("profile", UserProfile.class));
// ListState — append-only list per key
ListState<Event> buffer = getRuntimeContext()
.getListState(new ListStateDescriptor<>("buffer", Event.class));
// MapState — key-value map per key; efficient for sparse lookups
MapState<String, Long> countMap = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("counts", String.class, Long.class));
状态 TTL 用于自动清理过期状态:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(1000, true)
.build();
ValueStateDescriptor<UserSession> desc = new ValueStateDescriptor<>("session", UserSession.class);
desc.enableTimeToLive(ttlConfig);
检查点与容错
Flink 容错基于 Chandy-Lamport 分布式快照算法。检查点捕获所有算子状态的一致性快照,通过从保存的偏移量重放源数据实现故障恢复。
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
CheckpointConfig cfg = env.getCheckpointConfig();
cfg.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
cfg.setMinPauseBetweenCheckpoints(30_000);
cfg.setCheckpointTimeout(120_000);
cfg.setMaxConcurrentCheckpoints(1);
cfg.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
cfg.enableUnalignedCheckpoints(); // reduces checkpoint time under backpressure
端到端精确一次与 Kafka
对于从 Kafka source 到 Kafka sink 的精确一次语义,Flink 通过 Kafka 事务使用两阶段提交协议。
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-events")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.committed())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-producer-")
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.sinkTo(sink);
env.execute("Exactly-Once Pipeline");
Flink SQL 流分析
Flink SQL 提供高级声明式接口,非常适合标准聚合模式。
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Tumbling window revenue aggregation
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM user_events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE), user_id;
-- Temporal join: enrich events with versioned dimension data
SELECT e.user_id, e.amount, u.tier
FROM user_events AS e
LEFT JOIN user_tiers FOR SYSTEM_TIME AS OF e.event_time AS u
ON e.user_id = u.user_id;

异步 I/O 用于外部查找
用于外部富化的阻塞同步 I/O 是常见瓶颈。异步 I/O 并发发出多个请求,保持管道高吞吐量。
DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWait(
events,
new AsyncDatabaseEnricher(),
1000,
TimeUnit.MILLISECONDS,
100 // max concurrent async requests
);
背压处理
Flink 通过基于信用的流控制将背压向上游传播。在 Flink UI 中监控背压比率——持续的 HIGH 背压表明存在瓶颈算子。
常见解决方法:
- 增加慢阶段的算子并行度
- 优化算子(减少状态 I/O,批量外部调用)
- 对外部系统查找使用异步 I/O
- 扩展 sink 目标(如果下游系统饱和)
Savepoint 与 Checkpoint 对比
- Checkpoint:自动,由 Flink 管理;用于故障恢复
- Savepoint:手动,由算子触发;用于计划升级、扩缩容、A/B 测试
# Take a savepoint before upgrading job logic
flink savepoint <job-id> s3://flink-savepoints/before-upgrade/
# Restore and start updated job from savepoint
flink run -s s3://flink-savepoints/before-upgrade/ my-updated-job.jar
结论
Apache Flink 结合了事件时间语义、表达力丰富的窗口操作、丰富的状态管理和端到端精确一次保证,使其特别适合关键任务流处理应用。从 Flink SQL 开始进行标准聚合,使用 DataStream API 处理复杂有状态逻辑,配置检查点实现容错,使用侧输出和 allowedLateness 处理迟到数据,并监控背压以维持吞吐量。这些基础使实时管道既正确又具有操作弹性。