tonglin0325的个人主页

go学习笔记——gin框架

gin是一款轻量级的go web开发框架,官方文档

1
2
https://gin-gonic.com/docs/examples/

1.gin web项目结构

参考

1
2
https://github.com/voyagegroup/gin-boilerplate

gin+protobuf wire参考

1
2
https://github.com/mohuishou/blog-code/tree/main/01-go-training/04-project/10-layout

2.gin web quick start

1
2
https://gin-gonic.com/docs/quickstart/

在官方文档中提供了2个quick start的demo,一个稍微复杂,一个比较简单

简单的例子如下,创建一个/ping接口,返回pong

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import "github.com/gin-gonic/gin"

func main() {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(200, gin.H{
"message": "pong",
})
})
r.Run() // listen and serve on 0.0.0.0:8080
}

全文 >>

go学习笔记——Kratos框架

官方文档

1
2
https://go-kratos.dev/en/docs/getting-started/start/

1.安装Go

参考:mac安装go1.20

2.安装Kratos框架

kratos依赖protobuf grpc等框架,需要先进行安装

1
2
3
4
5
brew install grpc
brew install protobuf
brew install protoc-gen-go
brew install protoc-gen-go-grpc

验证

1
2
3
4
5
6
7
8
9
protoc --version
libprotoc 3.21.9

protoc-gen-go --version
protoc-gen-go v1.28.1

protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.2.0

安装kratos框架

1
2
go install github.com/go-kratos/kratos/cmd/kratos/v2@latest

验证

全文 >>

特征预处理——编码

对于标称型数据,在特征处理的时候,需要对其进行编码

在编码之前,如果训练集和测试集是分开的,则需要对其进行合并,避免标称数据丢失

1
2
3
4
5
# 合并
df = train.append(test).reset_index()
# 列名
original_columns = list(df.columns)

常用的编码方式如下

1.Label编码

对于一个有m个category的特征,经过label encoding以后,每个category会映射到0到m-1之间的一个数。label encoding适用于ordinal feature (特征存在内在顺序)。

1
2
3
4
5
6
7
8
from sklearn import preprocessing

for col in original_columns:
enc = preprocessing.LabelEncoder()
enc.fit(np.concatenate([train[col], test[col]]))
train[col] = enc.transform(train[col])
test[col] = enc.transform(test[col])

或者

1
2
train_data = train_data.replace({'BsmtQual': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, np.NaN: 0}})

2.顺序编码

类似于Label编码

参考:数据转化

全文 >>

kafka学习笔记——topic配置

在创建kafka topic的时候可以添加很多配置,如下表格

参考:Kafka Topic配置

 

参数名 含义
cleanup.policy 日志清除的策略,默认为 delete。如果要使用日志压缩,就需要让策略包含 compact。需要注意的是,如果开启了 compact 策略,则客户端提交的消息的 key 不允许为 null,否则提交报错 compact
compression.type 指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“gzip”、“snappy”、“lz4”、“zstd”)。设置为’producer’这意味着保留生产者设置的原始压缩编解码。“gzip”:压缩效率高,适合高内存、CPU;“snappy”:适合带宽敏感性,压缩力度大 producer
delete.retention.ms 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 86400000 (24 hours)
file.delete.delay.ms 从文件系统中删除文件之前等待的时间 60000
flush.messages 此设置允许指定我们强制fsync写入日志的数据的间隔。例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。 10000
flush.ms 此设置允许我们强制fsync写入日志的数据的时间间隔。例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率 1000
follower.replication.throttled.replicas follower复制限流列表。该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:…或者通配符’*’可用于限制此topic的所有副本。  
index.interval.bytes 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。 更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。 4096
leader.replication.throttled.replicas 在leader方面进行限制的副本列表。该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或使用通配符‘*’限制该topic的所有副本。  
max.message.bytes kafka允许的最大的消息批次大小。如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。
在最新的消息格式版本中,消息总是分组批量来提高效率。在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。
10485760
message.format.version 指定消息附加到日志的消息格式版本。该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。 0.11.0-IV2
message.timestamp.difference.max.ms   9223372036854775807
message.timestamp.type   CreateTime
min.cleanable.dirty.ratio 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 0.5
min.compaction.lag.ms 消息在日志中保持不压缩的最短时间。仅适用于正在压缩的日志。 0
min.insync.replicas 当生产者设置应答为”all”(或“-1”)时,此配置指定了成功写入的副本应答的最小数。如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当min.insync.replicas和acks强制更大的耐用性时。典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。如果多数副本没有收到写入,这将确保生产者引发异常。  1
preallocate 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 false
retention.bytes 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。默认情况下,没有设置大小限制则仅限于时间限制。 -1
retention.ms 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。这代表SLA消费者必须读取数据的时间长度。  604800000
segment.bytes 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 104857600
segment.index.bytes 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 10485760
segment.jitter.ms 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 0
segment.ms 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满 604800000
unclean.leader.election.enable 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失 false

 问题集合:apache kafka技术分享系列(目录索引)  

 

HBase学习笔记——客户端API

介绍HBase的Java API,参考:HBase读写的几种方式(一)java篇 和 Hbase–put、BufferedMutator、get

1.写HBase

1.单行put

HTable非线程安全,切较为低效

 

2.客户端的写缓冲区和List

一个put操作都是一个RPC操作,只适合小数据量的操作,HBase的API配置了客户端的写缓冲区,缓冲区负责收集put宝座,然后调用RPC操作一次性将put送往服务器。

默认情况下,客户端缓冲区是禁用的,可以通过将自动刷写(****autoflush=false)来激活缓冲区,如果(****autoflush=true),用户每次调用put方法是都会触发刷写。当激活缓冲区后,单行put不会产生RPC调用,因为存储的Put实例保存在客户端进程的内存中。只有flushCommits方法调用后才会将所有的修改传送到远程服务器。

默认的写缓冲区大小为2MB(2097152字节),可以在客户端中配置,也可以在habse-site.xml中的hbase.client.write.buffer 配置一个较大的预设值,比如20MB

显式刷写,即用户调用 flushCommits方法

隐式刷写,即会在调用put方法和setWriteBufferSize方法后比较超出缓冲区大小后调用 flushCommits方法

 

3.使用BufferedMutator

org.apache.hadoop.hbase.client.BufferedMutator主要用来对HBase的单个表进行操作。它和Put类的作用差不多,但是主要用来实现批量的异步写操作。

使用BufferedMutator进行批量异步插入的方式,效率更高,参考:HBase新版本Java APIHbase实战–数据读写解析

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.hadoop.hbase.client.BufferedMutator;
/** Helper function to create a table and return the rows that it created. */

private static void writeData(String tableId, int numRows) throws Exception {
Connection connection = admin.getConnection();
TableName tableName = TableName.valueOf(tableId);
BufferedMutator mutator = connection.getBufferedMutator(tableName);
List<Mutation> mutations = makeTableData(numRows);
mutator.mutate(mutations);
mutator.flush();
mutator.close();
}

全文 >>

Flink学习笔记——读写HBase

1.如果是csa(Cloudera Streaming Analytics)版本的高版本HBase

可以参考Cloudera官方例子,通过引入官方提供的flink-hbase来实现

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.0-csa1.0.0.0</version>
</dependency>

要求flink最低版本1.9.0,hbase最低版本2.1.0-cdh6.3.0,然后就可以使用HBaseSinkFunction来写Hbase

1
2
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-configuration.html

 

2.如果是低版本的HBase

则需要自行使用Java API来写HBase,比如我使用hbase版本为1.2.0-cdh5.1.6.2,可以参考:HBase读写的几种方式(三)flink篇Flink 消费kafka数据写入hbase

写HBase的时候注意开启Hbase regionserver的hbase.regionserver.port端口防火墙,默认为60020

Java API写Hbase的方式主要有2种:

1.一种是使用Table的put API

可以参考

1
2
https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/SimpleHBaseTableSink.java

全文 >>

Hive学习笔记——metastore listener

除了使用hive hook来记录hive上用户的操作之外,还可以使用hive metastore listener来进行记录,参考:

1
2
https://towardsdatascience.com/apache-hive-hooks-and-metastore-listeners-a-tale-of-your-metadata-903b751ee99f

hive metastore的接口有3种,分别是

 

Property Abstract class
hive.metastore.pre.event.listeners org.apache.hadoop.hive.metastore.MetaStorePreEventListener
hive.metastore.end.function.listeners org.apache.hadoop.hive.metastore.MetaStoreEndFunctionListener
hive.metastore.event.listeners org.apache.hadoop.hive.metastore.MetaStoreEventListener

下面的代码中继承了MetaStoreEventListener,实现的功能是在建表和修改表的时候,日志输出一下table的元数据信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.bigdata.hive;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class MyListener extends MetaStoreEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(MyListener.class);
private static final ObjectMapper objMapper = new ObjectMapper();

public MyListener(Configuration config) {
super(config);
logWithHeader(" created ");
}

@Override
public void onCreateTable(CreateTableEvent event) {
logWithHeader(event.getTable());
}

@Override
public void onAlterTable(AlterTableEvent event) {
logWithHeader(event.getOldTable());
logWithHeader(event.getNewTable());
}

private void logWithHeader(Object obj) {
LOGGER.info("[CustomListener][Thread: " + Thread.currentThread().getName() + "] | " + objToStr(obj));
}

private String objToStr(Object obj) {
try {
return objMapper.writeValueAsString(obj);
} catch (IOException e) {
LOGGER.error("Error on conversion", e);
}
return null;
}

}

进行打包

1
2
mvn clean package

将jar包放到/var/lib/hive目录下,并修改用户组

全文 >>

mac下安装gradle7.3

gradle和maven类似,是一个构建工具

gradle安装和配置

1.mac安装gradle

1
2
brew install gradle

或者下载gradle的二进制安装包

1
2
https://gradle.org/releases/

然后在~/.bash_profile中配置

1
2
3
4
# gradle
export GRADLE_HOME=/Users/lintong/software/gradle-7.3
export PATH=$GRADLE_HOME/bin:$PATH

2.查看是否安装成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
gradle -v

------------------------------------------------------------
Gradle 7.3
------------------------------------------------------------

Build time: 2021-11-09 20:40:36 UTC
Revision: 96754b8c44399658178a768ac764d727c2addb37

Kotlin: 1.5.31
Groovy: 3.0.9
Ant: Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM: 1.8.0_211 (Oracle Corporation 25.211-b12)
OS: Mac OS X 10.16 x86_64

使用gradle后,老版本的IDEA 2017.1 对gradle的支持较弱,建议升级到新版本 2021.1

全文 >>

Spark学习笔记——读写ScyllaDB

Scylla兼容cassandra API,所以可以使用spark读写cassandra的方法来进行读写

1.查看scyllaDB对应的cassandra版本

1
2
3
cqlsh:my_db> SHOW VERSION
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]

2.查看spark和cassandra对应的版本

参考:https://github.com/datastax/spark-cassandra-connector

3.写scyllaDB

dataset API写scyllaDB

1
2
3
4
5
6
ds2.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "my_tb", "keyspace" -> "my_db", "output.consistency.level" -> "ALL", "ttl" -> "8640000"))
.save()

RDD API写scyllaDB

1
2
3
4
5
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.spark.connector._

ds.rdd.saveToCassandra("my_db", "my_tb", writeConf = WriteConf(ttl = TTLOption.constant(8640000), consistencyLevel = ConsistencyLevel.ALL))

注意字段的数量和顺序需要和ScyllaDB表的顺序一致,可以使用下面方式select字段

1
2
3
4
5
6
7
8
9
10
11
val columns = Seq[String](
"a",
"b",
"c")
val colNames = columns.map(name => col(name))
val colRefs = columns.map(name => toNamedColumnRef(name))

val df2 = df.select(colNames: _*)
df2.rdd
.saveToCassandra(ks, table, SomeColumns(colRefs: _*), writeConf = WriteConf(ttl = TTLOption.constant(8640000), consistencyLevel = ConsistencyLevel.ALL))

全文 >>

DataGrip2017.1连接Hive

在使用低版本的DataGrip的时候,还没有hive的data source,需要自行添加数据源

1.下载hive driver,如果你使用的EMR的大数据集群的话,下载地址

1
2
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/HiveJDBCDriver.html

 添加一个自定义的Driver and Data source

起名为Amazon Hive,并保存

 

2.再添加一个Amazon Hive

全文 >>