tonglin0325的个人主页

Flink学习笔记——读写hudi

使用flink来读写hudi有2种API,一个是Flink SQL API,另一个是DataStream API,参考

1
https://hudi.apache.org/cn/docs/flink-quick-start-guide

1.Flink SQL API#

首先启动yarn session

1
2
/usr/lib/flink/bin/yarn-session.sh -n 3 -s 5 -jm 1024 -tm 4096 -d

使用SQL API提交任务到YARN上的方式有以下几种:

1.使用交互式的sql client#

不过由于sql client目前处于beta版本,所以建议用于原型验证,不建议在生产环境中使用,参考:Apache Flink 零基础入门(四):客户端操作的 5 种模式

1
2
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/

启动

1
2
/usr/lib/flink/bin/sql-client.sh

首先使用Flink SQL创建hudi表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Flink SQL> CREATE TABLE hudi_test_table(
_id STRING,
xxx STRING,
primary key(_id) not enforced
) WITH (
'connector' = 'hudi',
'path' = 's3a://xxxx/hudi_test_table',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled'= 'true',
'compaction.async.enabled'='true',
'compaction.tasks'= '4',
'compaction.trigger.strategy'= 'time_elapsed',
'compaction.delta_seconds'= '600',
'compaction.max_memory'= '1024',
'write.option' = 'upsert',
'read.streaming.check-interval'= '3',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://xxx:9083',
'hive_sync.table'='hudi_test_table',
'hive_sync.db'='default',
'write.tasks'='4'
);

查询该hudi表

1
2
Flink SQL> select * from hudi_test_table limit 10;

查询结果

2.编写flink job,使用StreamExecutionEnvironment#

在生产环境中如果使用FlinkSQL,建议使用
 StreamExecutionEnvironment,参考

1
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/overview/

 

2.Flink DataStream API#

首先同样是启动yarn session