tonglin0325的个人主页

Flink学习笔记——checkpoint

1.开启checkpoint#

默认情况下checkpoint是禁用的,需要手动进行开启,如下

1
2
3
4
5
6
7
8
9
10
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.getCheckpointConfig().setCheckpointTimeout(3600000); // checkpoint1小时超时
env.enableCheckpointing(180000, CheckpointingMode.EXACTLY_ONCE); // 3分钟做一次checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 允许两个连续的 checkpoint 错误
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 使用rocksdb state backend
env.getCheckpointConfig().setCheckpointStorage("s3://xxxx/xxxx/test_checkpoint"); // 指定gcheckpoint路径
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留

其他配置参考官方文档

1
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/datastream/fault-tolerance/checkpointing/

2.从保留的 checkpoint 中恢复状态
#

1
2
bin/flink run -s checkpoint_path

参考:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/checkpoints/

3.checkpoint相关问题#

做checkpoint很慢问题排查,参考:2022年最新版 | Flink经典线上问题小盘点

增量checkpoint,参考:Flink 管理大型状态之增量 Checkpoint

大状态与 Checkpoint 调优,参考:大状态与 Checkpoint 调优

 

4.checkpoint原理#

Checkpoint 由 CheckpointCoordinator 发起、确认,通过Rpc 通知 Taskmanager 的具体算子完成 Checkpoint 操作,参考:Flink Checkpoint 流程,具体步骤如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1、CheckpointCoordicator tirgger checkpoint 到 source
2、Source
  1、生成并广播 CheckpointBarrier
  2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
3、Map
  1、接收到 CheckpointBarrier
  2、广播 CheckpointBarrier
  3、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
4、Sink
  1、接收到 CheckpointBarrier
  2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
5、CheckpointCoordicator 接收到 所有 ack
  1、给所有算子发 notifyCheckpointComplete
6、Source、Map、Sink 收到 notifyCheckpointComplete

详细步骤参考:Checkpoint 原理剖析与应用实践

5.checkpoint和savepoint区别#

|Savepoint|Checkpoint
|Savepoint是由命令触发, 由用户创建和删除|Checkpoint被保存在用户指定的外部路径中, flink自动触发

Checkpoint被保存在用户指定的外部路径中, flink自动触发
|保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。|当作业失败或被取消时,将保留外部存储的检查点。

当作业失败或被取消时,将保留外部存储的检查点。
|用户必须提供用于还原作业状态的保存点的路径。|用户必须提供用于还原作业状态的检查点的路径。

用户必须提供用于还原作业状态的检查点的路径。