tonglin0325的个人主页

Flink学习笔记——Flink MySQL CDC

Flink CDC提供了一系列connector,用于从其他数据源获取变更数据(change data capture),其中的Flink MySQL CDC基于Debezium

官方文档

1
2
https://ververica.github.io/flink-cdc-connectors/release-2.3/content/about.html

官方github

1
2
https://github.com/ververica/flink-cdc-connectors

Flink和Flink CDC的版本对应关系参考:

1
2
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/

各种数据源使用案例,参考:

基于 AWS S3、EMR Flink、Presto 和 Hudi 的实时数据湖仓 – 使用 EMR 迁移 CDH

Flink CDC关于source和sink全调研及实践

原理#

Flink MySQL CDC官方文档:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mysql-cdc.md

MySQL binlog可以参考:
 MySQL学习笔记——binlog

Flink MySQL CDC在2.0版本之后有比较大的性能提升:

1.无锁

2.并发同步(在大表同步的时候可以加速)

3.支持snapshot阶段checkpoint(在snapshot同步阶段失败后可以继续同步)

参考:Flink CDC 2.0 实现原理剖析

使用datastream api#

可以参考:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/datastream-api-package-guidance.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.bigdata.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;

public class MysqlCdcExample {

public static void main(String[] args) throws Exception {

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("test.user") // set captured table
.username("root")
.password("123456")
.serverTimeZone("America/Danmarkshavn")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 1 parallel source tasks
.setParallelism(1)
.print().setParallelism(1); // use parallelism 1 for sink

env.execute("Print MySQL Snapshot + Binlog");
}

}

insert一条MySQL数据,输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"before":null,
"after":{
"id":"AQ==",
"username":"test",
"email":"test@test.com"
},
"source":{
"version":"1.9.8.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":0,
"snapshot":"false",
"db":"test",
"sequence":null,
"table":"user",
"server_id":0,
"gtid":null,
"file":"",
"pos":0,
"row":0,
"thread":null,
"query":null
},
"op":"r",
"ts_ms":1723951109745,
"transaction":null
}

update这条数据,输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"before":{
"id":"AQ==",
"username":"test",
"email":"test@test.com"
},
"after":{
"id":"AQ==",
"username":"test123",
"email":"test@test.com"
},
"source":{
"version":"1.9.8.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1723951151000,
"snapshot":"false",
"db":"test",
"sequence":null,
"table":"user",
"server_id":1,
"gtid":null,
"file":"mysql-bin.000005",
"pos":1307,
"row":0,
"thread":75,
"query":null
},
"op":"u",
"ts_ms":1723951151768,
"transaction":null
}

删除这条数据,输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"before":{
"id":"AQ==",
"username":"test123",
"email":"test@test.com"
},
"after":null,
"source":{
"version":"1.9.8.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1723953255000,
"snapshot":"false",
"db":"test",
"sequence":null,
"table":"user",
"server_id":1,
"gtid":null,
"file":"mysql-bin.000005",
"pos":1627,
"row":0,
"thread":99,
"query":null
},
"op":"d",
"ts_ms":1723953255153,
"transaction":null
}

 

报错#

1.Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

这个因为没有给mysql cdc任务指定时区,可以使用如下命令查看mysql的时区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | UTC |
| time_zone | SYSTEM |
+------------------+--------+
2 rows in set (0.02 sec)

mysql> select now();
+---------------------+
| now() |
+---------------------+
| 2024-08-17 15:29:48 |
+---------------------+
1 row in set (0.00 sec)

假设mysql的时区是UTC,则需要指定和mysql时区一样的时区配置,如下

1
2
3
4
5
6
7
8
9
10
11
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(55000)
.databaseList("default") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("default.test") // set captured table
.username("root")
.password("123456")
.serverTimeZone("America/Danmarkshavn")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

配置参考:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mysql-cdc.md

2.Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via ‘SHOW MASTER STATUS’. Make sure your server is correctly configured

需要假设mysql的binlog是否是开启ON状态,如果是OFF状态的话则会报错

1
2
mysql>  show variables like 'log_bin';

 

MySQL CDC业界使用案例#

1.基于 Flink CDC + Hudi 湖仓一体方案实践 (37互娱)

2.Flink CDC + Hudi 海量数据入湖在顺丰的实践 (顺丰)