tonglin0325的个人主页

Kafka学习笔记——Consumer API

参考kafka官方文档,版本1.0.x

1
2
http://kafka.apache.org/10/documentation.html#consumerapi

依赖,选择 Cloudera Rel 中的 1.0.1-kafka-3.1.0

1
2
3
4
5
6
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1-kafka-3.1.0</version>
</dependency>

Kafka的消费者有2套API,一个是新版的Java API,在 org.apache.kafka.clients 包中

1
2
http://kafka.apache.org/10/documentation.html#newconsumerconfigs

一个是旧版的Scala API,在 kafka.consumer 包中

1
2
http://kafka.apache.org/10/documentation.html#oldconsumerconfigs

其中new consumer api中文含义参考

1
2
https://tonglin0325.github.io/xml/kafka/1.0.1-kafka-3.1.0/new-consumer-api.xml

全文 >>

zigzag编码原理

在Thrift,Protobuf和avro序列化框架中,不约而同使用了zigzag编码来对数字进行编码,从而达到减少数据传输量的目的。

zigzag算法的核心主要是去除二进制数字中的前导0,因为在绝大多数情况下,我们使用到的整数,往往是比较小的。

参考:小而巧的数字压缩算法:zigzag

在avro编码中,对于字符串Martin,长度为6,而6的二进制为0000 0110,其中首位置的0为符号位,在zigzag编码中,正数的符号位会移动到末尾,其它位往前移动一位,所以会变成0000 1100,即0c,再后面的字节是字符串UTF-8编码后的结果

在protobuf编码中,对于字符串的Martin,刚开始的字节表示其id和数据类型,下一个字节表示其长度,后面的字节是字符串UTF-8编码后的结果

参考:《数据密集型应用系统设计》的 Schema evolution in Avro, Protocol Buffers and Thrift

Avro,Protocol Buffer和Thrift中的模式演化(译)

全文 >>

存储底层数据结构对比

该文章对比了常用的一些存储底层所使用的数据结构。

1.B+树

MySQL,MongoDB的索引使用的就是B+树

B+树在多读少写(相对而言)的情境下比较有优势。

B+树的主要优点:

  • 1.结构比较扁平,高度低(一般不超过4层),随机寻道次数少;
  • 2.数据存储密度大,且都位于叶子节点,查询稳定,遍历方便;
  • 3.叶子节点形成有序链表,范围查询转化为顺序读,效率高。相对而言B树必须通过中序遍历才能支持范围查询。

B+树的缺点:

  • 1.如果写入的数据比较离散,那么寻找写入位置时,子节点有很大可能性不会在内存中,最终会产生大量的随机写,性能下降。
  • 2.如果B+树已经运行了很长时间,写入了很多数据,随着叶子节点分裂,其对应的块会不再顺序存储,而变得分散。这时执行范围查询也会变成随机读,效率降低了。

参考:从B+树到LSM树,及LSM树在HBase中的应用

2.LSM树(Log-structured Merge-Tree/日志结构的合并树)

RocksDB,HBase,Cassandra,Kudu底层使用的是LSM树

LSM树的优点

  • 1.LSM树由于数据按顺序存储,因此可以有效地执行区间查询(从最小值到最大值扫描所有的键),并且由于磁盘是顺序写入的(以顺序方式写入紧凑的SSTable文件,而不必重写树中的多个页),所以LSM-Tree可以支持非常高的写入吞吐量。

LSM树的缺点

  • 1.压缩过程中有时会干扰正在进行的读写操作。即使存储引擎尝试增量地执行压缩,并且不影响并发访问,但由于磁盘的并发资源有限,所以当磁盘执行昂贵的压缩操作时,很容易发生读写请求等待的情况。而B-tree的响应延迟则更具确定性。

参考:LSM树由来、设计思想以及应用到HBase的索引 和 数据密集型应用系统设计第3章

3.跳表

Redis底层使用的是SkipTable

Mysql的索引为什么使用B+树而不使用跳表?

B+树是多叉树结构,每个结点都是一个16k的数据页,能存放较多索引信息,所以扇出很高。三层左右就可以存储2kw左右的数据(知道结论就行,想知道原因可以看之前的文章)。也就是说查询一次数据,如果这些数据页都在磁盘里,那么最多需要查询三次磁盘IO。

跳表是链表结构,一条数据一个结点,如果最底层要存放2kw数据,且每次查询都要能达到二分查找的效果,2kw大概在2的24次方左右,所以,跳表大概高度在24层左右。最坏情况下,这24层数据会分散在不同的数据页里,也即是查一次数据会经历24次磁盘IO。

因此存放同样量级的数据,B+树的高度比跳表的要少,如果放在mysql数据库上来说,就是磁盘IO次数更少,因此B+树查询更快。

全文 >>

ElasticSearch学习笔记——插件开发

参考

1
2
3
4
5
6
https://dzone.com/articles/elasticsearch5-how-to-build-a-plugin-and-add-a-lis
https://github.com/chrisshayan/es-changes-feed-plugin
https://blog.csdn.net/qq_16164711/article/details/87872383
https://blog.gaiaproject.club/es-develop-plugin/
https://blog.51cto.com/13755625/2117995

Es的插件主要有如下几种类型,参考

1
2
https://github.com/elastic/elasticsearch/tree/master/server/src/main/java/org/elasticsearch/plugins

 

API Extension Plugins API拓展插件:API extension plugins add new functionality to Elasticsearch by adding new APIs or features, usually to do with search or mapping.

Analysis Plugins 解析器插件:Analysis plugins extend Elasticsearch by adding new analyzers, tokenizers, token filters, or character filters to Elasticsearch.

Alerting Plugins 告警插件:Alerting plugins allow Elasticsearch to monitor indices and to trigger alerts when thresholds are breached.

Discovery Plugins 发现插件:Discovery plugins extend Elasticsearch by adding new discovery mechanisms that can be used instead of Zen Discovery.

Ingest Plugins 摄取插件:The ingest plugins extend Elasticsearch by providing additional ingest node capabilities.

全文 >>

HBase学习笔记——rowkey

1.Airbnb rowkey设计案例

在Airbnb的rowkey设计案例中,使用了hash法避免了写入热点问题,其中

Event_key标识了一条日志的唯一性,用于将来自Kafka的日志数据进行去重;

Shard_id是将Event_key进行hash(可以参考es的路由哈希算法Hashing.murmur3_128)之后,对Shard_num进行取余后的结果,Shard_num感觉应该是当前hbase表region server的总数,由于airbnb在hbase中存储的是实时日志数据,并开启了Hbase的TTL,所以当前hbase表中的数据总量应该是可预测的,即region server数量不会无限增加

Shard_key应该就是当前业务的region_start_keys+shard_id,比如当前业务分配的前缀为00000,同时规划了100个table regions给这个业务,即00-99,那么Shard_key的范围就是0000000-0000099

rowkey就是Shard_key.Event_key,比如0000000.air_events.canaryevent.016230-a3db-434e

参考:

Reliable and Scalable Data Ingestion at Airbnb

Apache HBase at Airbnb

Airbnb软件工程师丁辰 - Airbnb的Streaming ETL

2.rowkey设计原则

1.rowkey长度原则:rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。

建议越短越好,不要超过16个字节,原因如下:

  • 数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;
  • MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。
  • 目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
  • 2.rowkey散列原则:如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。

    如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

    全文 >>

    Hive学习笔记——fetch

    在美团点评的文章中,介绍了HiveSQL转化为MapReduce的过程

    1
    2
    3
    4
    5
    6
    7
    1、Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
    2、遍历AST Tree,抽象出查询的基本组成单元QueryBlock
    3、遍历QueryBlock,翻译为执行操作树OperatorTree
    4、逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
    5、遍历OperatorTree,翻译为MapReduce任务
    6、物理层优化器进行MapReduce任务的变换,生成最终的执行计划

    参考:Hive SQL的编译过程

    但是不是所有的SQL都有必要转换为MR来执行,比如

    1
    2
    select * from xx.xx limit 1

    Hive只需要直接读取文件,并传输到控制台即可

     

    在hive-default.xml配置文件中,有2个参数,hive.fetch.task.conversion和hive.fetch.task.conversion.threshold

    hive.fetch.task.conversion属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce

    hive.fetch.task.conversion.threshold属性表示在输入大小为多少以内的时候fetch task生效,默认1073741824 byte = 1G

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
    Expects one of [none, minimal, more].
    Some select queries can be converted to single FETCH task minimizing latency.
    Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins.
    0. none : disable hive.fetch.task.conversion
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
    </description>
    </property>

    <property>
    <name>hive.fetch.task.conversion.threshold</name>
    <value>1073741824</value>
    <description>
    Input threshold for applying hive.fetch.task.conversion. If target table is native, input length
    is calculated by summation of file lengths. If it's not native, storage handler for the table
    can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface.
    </description>
    </property>

    参考:

    全文 >>

    Flink学习笔记——用户自定义Functions

    Flink支持用户自定义 Functions,方法有2个

    Ref

    1
    2
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/user_defined_functions.html

    1. 实现 MapFunction接口
    1
    2
    3
    4
    5
    class MyMapFunction implements MapFunction<String, Integer> {
    public Integer map(String value) { return Integer.parseInt(value); }
    };
    data.map(new MyMapFunction());

    1. 继承 RichMapFunction
    1
    2
    3
    4
    class MyMapFunction extends RichMapFunction<String, Integer> {
    public Integer map(String value) { return Integer.parseInt(value); }
    };

     

    累加器和计数器

    这个应该和Hadoop和Spark的counter类似,参考

    1
    2
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/user_defined_functions.html#%E7%B4%AF%E5%8A%A0%E5%99%A8%E5%92%8C%E8%AE%A1%E6%95%B0%E5%99%A8

    全文 >>

    Flink学习笔记——Execution Mode

    Flink有3中运行模式,分别是STREAMING,BATCH和AUTOMATIC

    Ref

    1
    2
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/datastream_execution_mode.html

    1.STREAMING运行模式 是DataStream默认的运行模式

    2.BATCH运行模式 也可以在DataStream API上运行

    3.AUTOMATIC运行模式 是让系统根据source类型自动选择运行模式

    可以通过命令行来配置运行模式

    1
    2
    bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

    也可以在代码中配置

    1
    2
    3
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

     

    STREAMING运行模式中,Flink使用StateBackend来控制状态存储和checkpoint的工作,RocksDBStateBackend支持增量Checkpoint,其他2个不支持

    BATCH****运行模式中,statebackend是被忽略的,batch模式不支持checkpoint

    全文 >>

    Flink学习笔记——DataSet API

    Flink中的DataSet任务用于实现data sets的转换,data set通常是固定的数据源,比如可读文件,或者本地集合等。

    Ref

    1
    2
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/batch/

     使用DataSet API需要使用 批处理 env

    1
    2
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSet支持的Data Source有:File-based,Collection-based,Generic

    1.File-based

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

    readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

    readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field types.

    readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

    readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

    2.Collection-based

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    fromCollection(Collection) - Creates a data set from a Java.util.Collection. All elements in the collection must be of the same type.

    fromCollection(Iterator, Class) - Creates a data set from an iterator. The class specifies the data type of the elements returned by the iterator.

    fromElements(T ...) - Creates a data set from the given sequence of objects. All objects must be of the same type.

    fromParallelCollection(SplittableIterator, Class) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

    generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

    3.Generic

    1
    2
    3
    4
    readFile(inputFormat, path) / FileInputFormat - Accepts a file input format.

    createInput(inputFormat) / InputFormat - Accepts a generic input format.

     

    全文 >>