tonglin0325的个人主页

Flink学习笔记——读写HBase

1.如果是csa(Cloudera Streaming Analytics)版本的高版本HBase#

可以参考Cloudera官方例子,通过引入官方提供的flink-hbase来实现

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.0-csa1.0.0.0</version>
</dependency>

要求flink最低版本1.9.0,hbase最低版本2.1.0-cdh6.3.0,然后就可以使用HBaseSinkFunction来写Hbase

1
2
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-configuration.html

 

2.如果是低版本的HBase#

则需要自行使用Java API来写HBase,比如我使用hbase版本为1.2.0-cdh5.1.6.2,可以参考:HBase读写的几种方式(三)flink篇Flink 消费kafka数据写入hbase

写HBase的时候注意开启Hbase regionserver的hbase.regionserver.port端口防火墙,默认为60020

Java API写Hbase的方式主要有2种:

1.一种是使用Table的put API#

可以参考

1
2
https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/SimpleHBaseTableSink.java

使用Flink来写Hbase的时候,需要继承RichSinkFunction,然后重写invoke方法,在invoke方法中调用HBase的put API

需要注意的是调用HBase Table的put API较为低效,即使是使用了List,也会消耗较多的资源,建议使用第二种的BufferedMutator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {

private final String tableName;
private final HBaseWriterConfig writerConfig;

private transient TableName tName;
private transient Connection connection;

public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception {
Preconditions.checkNotNull(writerConfig);
Preconditions.checkNotNull(tableName);

this.writerConfig = writerConfig;
this.tableName = tableName;
}

@Override
public void open(Configuration parameters) throws Exception {
this.tName = TableName.valueOf(tableName);

org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
this.writerConfig.getHbaseConfig().forEach(conf::set);
this.connection = ConnectionFactory.createConnection(conf);
}

@Override
public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
Table table = null;
try {
table = connection.getTable(this.tName);
String rowKey = String.valueOf(record.offset());
String value = record.value();
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value));

if (StringUtils.isNotBlank(writerConfig.getDurability())) {
try {
Durability d = Durability.valueOf(writerConfig.getDurability());
put.setDurability(d);
} catch (Exception e) {
e.printStackTrace();
}
}
table.put(put);
} finally {
IOUtils.closeQuietly(table);
}
}

@Override
public void close() throws Exception {
if (this.connection != null) {
this.connection.close();
this.connection = null;
}
}
}

HBaseWriterConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.MapUtils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class HBaseWriterConfig implements Serializable {
private int writeBufferSize = -1;
private int asyncFlushSize = -1;
private long asyncFlushInterval = -1;
private boolean async;
private String durability;
private Map<String, String> hbaseConfig = new HashMap<>();

public static class Builder {
private int writeBufferSize = 5 * 1024 * 1024;
private int asyncFlushSize = 5000;
// 单位:秒
private long asyncFlushInterval = 60;
private boolean async;
private String durability;
private Map<String, String> config = new HashMap<>();

public static synchronized Builder me() {
return new Builder();
}

public Builder setDurability(String durability) {
this.durability = durability;
return this;
}

public Builder writeBufferSize(int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}

public Builder conf(String key, String value) {
this.config.put(key, value);
return this;
}

public Builder conf(Map<String, String> config) {
if (MapUtils.isNotEmpty(config)) {
for (Map.Entry<String, String> e : config.entrySet()) {
this.config.put(e.getKey(), e.getValue());
}
}
return this;
}

public Builder aync() {
this.async = true;
return this;
}

public Builder flushInterval(long interval) {
this.asyncFlushInterval = interval;
return this;
}

public Builder flushSize(int size) {
this.asyncFlushSize = size;
return this;
}

public Builder async(boolean enable) {
this.async = enable;
return this;
}

public HBaseWriterConfig build() {
return new HBaseWriterConfig(writeBufferSize, asyncFlushSize, asyncFlushInterval, async, durability, config);
}

}
}

 addSink

1
2
3
4
5
6
dataStream.addSink(new SimpleHBaseTableSink(HBaseWriterConfig.Builder.me()
.async(true)
.build(),
"test_table")
);

 

2.另一种是使用BufferedMutator#

可以参考

1
2
https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/writter/HBaseWriterWithBuffer.java

使用BufferedMutator进行异步批量插入的方式效率更高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {

private final String tableName;
private final HBaseWriterConfig writerConfig;

private transient TableName tName;
private transient Connection connection;

public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception {
Preconditions.checkNotNull(writerConfig);
Preconditions.checkNotNull(tableName);

this.writerConfig = writerConfig;
this.tableName = tableName;
}

@Override
public void open(Configuration parameters) throws Exception {
this.tName = TableName.valueOf(tableName);

org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
this.writerConfig.getHbaseConfig().forEach(conf::set);
this.connection = ConnectionFactory.createConnection(conf);
}

@Override
public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
String rowKey = String.valueOf(record.offset());
String value = record.value();

// 设置缓存1m,当达到1m时数据会自动刷到hbase,替代了hTable.setWriteBufferSize(30 * 1024 * 1024)
BufferedMutatorParams params = new BufferedMutatorParams(this.tName);
params.writeBufferSize(1024 * 1024);
// 创建一个批量异步与hbase通信的对象
BufferedMutator mutator = connection.getBufferedMutator(params);
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value));
// 向hbase插入数据,达到缓存会自动提交,这里也可以通过传入List<put>的方式批量插入
mutator.mutate(put);
// 不用每次put后就调用flush,最后调用就行,这个方法替代了旧api的hTable.setAutoFlush(false, false)
mutator.flush();
mutator.close();
}

@Override
public void close() throws Exception {
if (this.connection != null) {
this.connection.close();
this.connection = null;
}
}
}

 

一些Hbase sink需要的constants

1
2
https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

需要引入

1
2
3
4
5
6
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.15.1</version>
</dependency>

参考flume的sink实现

flume在1.8.0支持hbase的1.x版本,使用

1
2
https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java

在1.9.0支持了habse的2.x版本,使用

1
2
https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java