1.开启checkpoint
默认情况下checkpoint是禁用的,需要手动进行开启,如下
1 2 3 4 5 6 7 8 9 10
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.getCheckpointConfig().setCheckpointTimeout(3600000); // checkpoint1小时超时 env.enableCheckpointing(180000, CheckpointingMode.EXACTLY_ONCE); // 3分钟做一次checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 允许两个连续的 checkpoint 错误 env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 使用rocksdb state backend env.getCheckpointConfig().setCheckpointStorage("s3://xxxx/xxxx/test_checkpoint"); // 指定gcheckpoint路径 env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
|
其他配置参考官方文档
1
| https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/datastream/fault-tolerance/checkpointing/
|
2.从保留的 checkpoint 中恢复状态
1 2
| bin/flink run -s checkpoint_path
|
参考:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/checkpoints/
3.checkpoint相关问题
做checkpoint很慢问题排查,参考:2022年最新版 | Flink经典线上问题小盘点
增量checkpoint,参考:Flink 管理大型状态之增量 Checkpoint
大状态与 Checkpoint 调优,参考:大状态与 Checkpoint 调优
4.checkpoint原理
Checkpoint 由 CheckpointCoordinator 发起、确认,通过Rpc 通知 Taskmanager 的具体算子完成 Checkpoint 操作,参考:Flink Checkpoint 流程,具体步骤如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 1、CheckpointCoordicator tirgger checkpoint 到 source 2、Source 1、生成并广播 CheckpointBarrier 2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator) 3、Map 1、接收到 CheckpointBarrier 2、广播 CheckpointBarrier 3、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator) 4、Sink 1、接收到 CheckpointBarrier 2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator) 5、CheckpointCoordicator 接收到 所有 ack 1、给所有算子发 notifyCheckpointComplete 6、Source、Map、Sink 收到 notifyCheckpointComplete
|
详细步骤参考:Checkpoint 原理剖析与应用实践
5.checkpoint和savepoint区别
|Savepoint|Checkpoint
|Savepoint是由命令触发, 由用户创建和删除|Checkpoint被保存在用户指定的外部路径中, flink自动触发
Checkpoint被保存在用户指定的外部路径中, flink自动触发
|保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。|当作业失败或被取消时,将保留外部存储的检查点。
当作业失败或被取消时,将保留外部存储的检查点。
|用户必须提供用于还原作业状态的保存点的路径。|用户必须提供用于还原作业状态的检查点的路径。
用户必须提供用于还原作业状态的检查点的路径。