tonglin0325的个人主页

Flink学习笔记——DataStream API

Flink中的DataStream任务用于实现data streams的转换,data stream可以来自不同的数据源,比如消息队列,socket,文件等。

Ref 

1
2
https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/datastream_api.html

 使用DataStream API需要使用stream env

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

DataStream支持的Data Source有:File-based,Socket-based,Collection-based,Custom

1.File-based

1
2
3
4
5
6
readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.

readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.

2.Socket-based

全文 >>

Flink学习笔记——Environment

Flink有以下几种Environment

  1. 批处理Environment,ExecutionEnvironment
1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

2.流处理Environment,StreamExecutionEnvironment

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  1. 本机Environment,LocalEnvironment
1
2
ExecutionEnvironment env = LocalEnvironment.getExecutionEnvironment();

  1. java集合Environment,CollectionEnvironment
1
2
ExecutionEnvironment env = CollectionEnvironment.getExecutionEnvironment();

Ref

全文 >>

Flink学习笔记——配置

在Flink任务中,需要加载外置配置参数到任务中,在Flink的开发文档中介绍了,Flink提供了一个名为 ParameterTool 的工具来解决这个问题

Flink开发文档:

1
2
https://github.com/apache/flink/blob/master/docs/dev/application_parameters.zh.md

其引入配置的方式有3种:

1. From .properties files

1
2
3
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

2. From the command line arguments

在args中添加 

1
2
--input hdfs:///mydata --elements 42

在代码中使用

全文 >>

论文阅读——Twitter日志系统

1.业界公司数据平台建设规模

1.twitter

Twitter关于日志系统的论文有如下2篇,分别是

《The Unified Logging Infrastructure for Data Analytics at Twitter》和《Scaling Big Data Mining Infrastructure: The Twitter Experience》

1
2
3
https://vldb.org/pvldb/vol5/p1771_georgelee_vldb2012.pdf
https://www.kdd.org/exploration_files/V14-02-02-Lin.pdf

相关PPT

1
2
3
4
5
6
https://www.slideshare.net/Hadoop_Summit/scaling-big-data-mining-infrastructure-twitter-experience
https://slideplayer.com/slide/12451118/
https://blog.twitter.com/engineering/en_us/topics/insights/2016/discovery-and-consumption-of-analytics-data-at-twitter.html
https://www.slideshare.net/kevinweil/hadoop-at-twitter-hadoop-summit-2010
https://www.slideshare.net/kevinweil/protocol-buffers-and-hadoop-at-twitter/1-Twitter_Open_Source_Coming_soon

其中《The Unified Logging Infrastructure for Data Analytics at Twitter》这篇Twitter12年的论文中介绍了Twitter的产品日志基础架构以及从应用特定日志到统一的“客户端事件”日志格式的演进,其中message都是Thrift message

 

 当时(2012年),Twitter的Hadoop集群规模有几百台。

全文 >>

SpringBoot学习笔记——Redis Template

Springboot可以通过redis template和redis进行交互,使用方法如下

 

可以参考这个系列的文章:

【快学springboot】11.整合redis实现session共享

【快学springboot】13.操作redis之String数据结构

【快学springboot】14.操作redis之list

还有python版本的redis实战

1
2
https://github.com/7-sevens/Developer-Books/blob/master/Redis/Redis%E5%AE%9E%E6%88%98.pdf

在pom中引入

1
2
3
4
5
6
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

全文 >>

Hadoop学习笔记——配置文件

下载hadoop的原生版本,版本选择2.6.0,下载地址

1
2
https://archive.apache.org/dist/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz

解压后可以看到

其中配置文件在 /etc/hadoop目录下

 

 

解压后默认的配置文件都是空的,需要自行配置,下面为配置文件的作用

其中比较重要的是前4个配置文件,分别为core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml

全文 >>

Hive学习笔记——SerDe

SerDe 是Serializer 和 Deserializer 的简称,它提供了Hive和各种数据格式交互的方式。

Amazon的Athena可以理解是Amazon对标hive的一款产品,其中对SerDe的介绍如下

1
2
https://docs.aws.amazon.com/zh_cn/athena/latest/ug/serde-about.html

对于Hive中经常使用的SerDe如下,参考了

Hive_10. Hive中常用的 SerDe 和 当前社区的状态

1.LazySimpleSerDe,用来处理文本格式文件:TEXTFILE 

1
2
https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java

2.RegexSerDe,可以使用java正则表达式来处理文本格式文件:TEXTFILE

1
2
https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/RegexSerDe.java

3.JSONSerDe,可以使用它来处理json格式的文本文件:TEXTFILE

全文 >>

MapReduce中的OutputFormat

OutputFormat在hadoop源码中是一个抽象类 public abstract class OutputFormat<K, V>,其定义了reduce任务的输出格式

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputFormat.java

可以参考文章

MapReduce快速入门系列(12) | MapReduce之OutputFormat

 

常用的OutputFormat可以查看源码

1
2
https://github.com/apache/hadoop/tree/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output

1.文本输出TextOutputFormat,是hadoop的默认实现输出功能的类

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java

全文 >>

Filebeat的http endpoint input

Filebeat的input终于支持了http,可以使用post请求向filebeat的input传输数据,不过现在还是处于beta版本

 

参考

1
2
https://www.elastic.co/guide/en/beats/filebeat/7.x/filebeat-input-http_endpoint.html

下载filebeat最新版本,然后解压

1
2
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.2-linux-x86_64.tar.gz

配置 filebeat_http.yml

1
2
3
4
5
6
7
8
9
10
11
filebeat.inputs:
- type: http_endpoint
enabled: true
listen_address: localhost
listen_port: 19080

#----------------------------------File output--------------------------------#
output.file:
path: "/tmp/filebeat"
filename: filebeat

给配置文件赋权

全文 >>

maven打包scala+java工程

在 scala和java混合编程的时候,需要添加一些额外的配置到pom中,才能将scala文件的class加到最终的jar中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
 <build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<!--includes是一个数组,包含要编译的code-->
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

 打包的命令

1
2
mvn clean scala:compile compile package assembly:single -Pproduction -Dmaven.test.skip=true

 

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<build>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<!--includes是一个数组,包含要编译的code-->
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

打包

1
2
mvn package assembly:single