1.如果是csa(Cloudera Streaming Analytics)版本的高版本HBase
可以参考Cloudera官方例子,通过引入官方提供的flink-hbase来实现
1 2 3 4 5 6
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.9.0-csa1.0.0.0</version> </dependency>
|
要求flink最低版本1.9.0,hbase最低版本2.1.0-cdh6.3.0,然后就可以使用HBaseSinkFunction来写Hbase
1 2
| https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-configuration.html
|
2.如果是低版本的HBase
则需要自行使用Java API来写HBase,比如我使用hbase版本为1.2.0-cdh5.1.6.2,可以参考:HBase读写的几种方式(三)flink篇 和 Flink 消费kafka数据写入hbase
写HBase的时候注意开启Hbase regionserver的hbase.regionserver.port端口防火墙,默认为60020
Java API写Hbase的方式主要有2种:
1.一种是使用Table的put API
可以参考
1 2
| https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/SimpleHBaseTableSink.java
|
使用Flink来写Hbase的时候,需要继承RichSinkFunction,然后重写invoke方法,在invoke方法中调用HBase的put API
需要注意的是调用HBase Table的put API较为低效,即使是使用了List,也会消耗较多的资源,建议使用第二种的BufferedMutator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {
private final String tableName; private final HBaseWriterConfig writerConfig;
private transient TableName tName; private transient Connection connection;
public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception { Preconditions.checkNotNull(writerConfig); Preconditions.checkNotNull(tableName);
this.writerConfig = writerConfig; this.tableName = tableName; }
@Override public void open(Configuration parameters) throws Exception { this.tName = TableName.valueOf(tableName);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); this.writerConfig.getHbaseConfig().forEach(conf::set); this.connection = ConnectionFactory.createConnection(conf); }
@Override public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception { Table table = null; try { table = connection.getTable(this.tName); String rowKey = String.valueOf(record.offset()); String value = record.value(); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value));
if (StringUtils.isNotBlank(writerConfig.getDurability())) { try { Durability d = Durability.valueOf(writerConfig.getDurability()); put.setDurability(d); } catch (Exception e) { e.printStackTrace(); } } table.put(put); } finally { IOUtils.closeQuietly(table); } }
@Override public void close() throws Exception { if (this.connection != null) { this.connection.close(); this.connection = null; } } }
|
HBaseWriterConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.apache.commons.collections.MapUtils;
import java.io.Serializable; import java.util.HashMap; import java.util.Map;
@Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) public class HBaseWriterConfig implements Serializable { private int writeBufferSize = -1; private int asyncFlushSize = -1; private long asyncFlushInterval = -1; private boolean async; private String durability; private Map<String, String> hbaseConfig = new HashMap<>();
public static class Builder { private int writeBufferSize = 5 * 1024 * 1024; private int asyncFlushSize = 5000; // 单位:秒 private long asyncFlushInterval = 60; private boolean async; private String durability; private Map<String, String> config = new HashMap<>();
public static synchronized Builder me() { return new Builder(); }
public Builder setDurability(String durability) { this.durability = durability; return this; }
public Builder writeBufferSize(int writeBufferSize) { this.writeBufferSize = writeBufferSize; return this; }
public Builder conf(String key, String value) { this.config.put(key, value); return this; }
public Builder conf(Map<String, String> config) { if (MapUtils.isNotEmpty(config)) { for (Map.Entry<String, String> e : config.entrySet()) { this.config.put(e.getKey(), e.getValue()); } } return this; }
public Builder aync() { this.async = true; return this; }
public Builder flushInterval(long interval) { this.asyncFlushInterval = interval; return this; }
public Builder flushSize(int size) { this.asyncFlushSize = size; return this; }
public Builder async(boolean enable) { this.async = enable; return this; }
public HBaseWriterConfig build() { return new HBaseWriterConfig(writeBufferSize, asyncFlushSize, asyncFlushInterval, async, durability, config); }
} }
|
addSink
1 2 3 4 5 6
| dataStream.addSink(new SimpleHBaseTableSink(HBaseWriterConfig.Builder.me() .async(true) .build(), "test_table") );
|
2.另一种是使用BufferedMutator
可以参考
1 2
| https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/writter/HBaseWriterWithBuffer.java
|
使用BufferedMutator进行异步批量插入的方式效率更高
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {
private final String tableName; private final HBaseWriterConfig writerConfig;
private transient TableName tName; private transient Connection connection;
public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception { Preconditions.checkNotNull(writerConfig); Preconditions.checkNotNull(tableName);
this.writerConfig = writerConfig; this.tableName = tableName; }
@Override public void open(Configuration parameters) throws Exception { this.tName = TableName.valueOf(tableName);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); this.writerConfig.getHbaseConfig().forEach(conf::set); this.connection = ConnectionFactory.createConnection(conf); }
@Override public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception { String rowKey = String.valueOf(record.offset()); String value = record.value();
// 设置缓存1m,当达到1m时数据会自动刷到hbase,替代了hTable.setWriteBufferSize(30 * 1024 * 1024) BufferedMutatorParams params = new BufferedMutatorParams(this.tName); params.writeBufferSize(1024 * 1024); // 创建一个批量异步与hbase通信的对象 BufferedMutator mutator = connection.getBufferedMutator(params); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value)); // 向hbase插入数据,达到缓存会自动提交,这里也可以通过传入List<put>的方式批量插入 mutator.mutate(put); // 不用每次put后就调用flush,最后调用就行,这个方法替代了旧api的hTable.setAutoFlush(false, false) mutator.flush(); mutator.close(); }
@Override public void close() throws Exception { if (this.connection != null) { this.connection.close(); this.connection = null; } } }
|
一些Hbase sink需要的constants
1 2
| https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
|
需要引入
1 2 3 4 5 6
| <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0-cdh5.15.1</version> </dependency>
|
参考flume的sink实现
flume在1.8.0支持hbase的1.x版本,使用
1 2
| https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
|
在1.9.0支持了habse的2.x版本,使用
1 2
| https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java
|