tonglin0325的个人主页

Flink学习笔记——读写kafka

Flink的kafka connector文档

1
2
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html

Flink写入kafka时候需要实现序列化反序列化

部分代码参考了

1
2
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

以及

1
2
3
https://juejin.im/post/5d844d11e51d4561e0516bbd
https://developer.aliyun.com/article/686809

1.依赖,其中

flink-java提供了flink的java api,包括dataset执行环境,format,一些算子

1
2
https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java

flink-streaming-java提供了flink的java streaming api,包括stream执行环境,一些算子

1
2
https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api

flink-connector-kafka提供了kafka的连接器

1
2
https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kafka

1.pom文件依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<!-- log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>

2.Kafka Consumer

作为kafka consumer有几个比较重要的配置参数

全文 >>

Flink学习笔记——scala shell

Flink也和和spark-shell类似的交互式开发模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bin/start-scala-shell.sh yarn
Starting Flink Shell:
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.address, localhost
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.port, 6123
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.heap.size, 1024m
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.process.size, 1568m
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.numberOfTaskSlots, 1
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: parallelism.default, 1
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.execution.failover-strategy, region
20/03/14 14:34:07 INFO cli.FlinkYarnSessionCli: Found Yarn properties file under /tmp/.yarn-properties-lintong.
20/03/14 14:34:07 WARN cli.FlinkYarnSessionCli: The configuration directory ('/home/lintong/software/apache/flink-1.10.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.

Connecting to Flink cluster (host: localhost, port: 6123).


▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░

F L I N K - S C A L A - S H E L L

读文件

1
2
3
4
5
6
7
8
9
10
scala> val dataSet = benv.readTextFile("hdfs://master:8020/user/lintong/logs/test/test.log")
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.address, localhost
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.port, 6123
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.heap.size, 1024m
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.process.size, 1568m
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.numberOfTaskSlots, 1
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: parallelism.default, 1
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.execution.failover-strategy, region
dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@13e5b262

打印

1
2
3
4
5
6
7
8
9
10
11
scala> dataSet.print()
20/03/14 14:49:10 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
20/03/14 14:49:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/14 14:49:11 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.2.105:8032
20/03/14 14:49:11 INFO yarn.YarnClusterDescriptor: No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
20/03/14 14:49:11 INFO yarn.YarnClusterDescriptor: Found Web Interface master:36441 of application 'application_1584163852090_0002'.
1
2
3
4

退出

1
2
3
scala> :q
good bye ..

全文 >>

Flink学习笔记——WordCount

参考Flink官方example

1
2
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.xxx.xx.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/**
* Created by lintong on 20-3-13.
*/
public class WordCount {

public static class WordCountData {

public static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
}

// *************************************************************************
// PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

// get input data
DataStream<String> text = null;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);

// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}

// *************************************************************************
// USER FUNCTIONS
// *************************************************************************

/**
* Implements the string tokenizer that splits sentences into words as a
* user-defined FlatMapFunction. The function takes a line (String) and
* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
* Integer>}).
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

}

运行参数

全文 >>

CDH学习笔记——cloudera manager API

可以使用CM提供的api查询cdh集群的信息

1
2
http://cloudera.github.io/cm_api/

7.0.3的api文档

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/index.html

查询impala query的api

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/json_ApiImpalaQuery.html

比如

1
2
https://xxxx:7180/api/v9/clusters/dev-cdh/services/impala/impalaQueries?from=2020-03-10T06:26:01.927Z

支持的参数如图所示

查询yarn上query的api

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/resource_YarnApplicationsResource.html

比如

1
2
https://xxxx:7180/api/v9/clusters/dev-cdh/services/yarn/yarnApplications

支持的参数如图所示,和impala的一样

全文 >>

Ubuntu16.04安装flink-1.10.0

本来想cdh集成flink,但是我的cdh版本为5.16.2,参考了下面的issue可能cdh版本太低,至少要cdh6

1
2
https://github.com/pkeropen/flink-parcel/issues

进行独立安装

1
2
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz

安装路径

1
2
/home/lintong/software/apache/flink-1.10.0

/etc/profile添加,并source /etc/profile

1
2
3
4
#flink
export FLINK_HOME=/home/lintong/software/apache/flink-1.10.0
export PATH=${FLINK_HOME}/bin:$PATH

下载flink-shaded-hadoop-2-uber-2.7.5-7.0.jar包,放到flink的lib目录下

1
2
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar

不然flink on yarn的时候会报

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more

启动yarn-session

1
2
yarn-session.sh -n 3 -s 5 -jm 1024 -tm 4096 -d

yarn-seesion参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-n : 指定TaskManager的数量;
-d: 以分离模式运行;
-id:指定yarn的任务ID;
-j:Flink jar文件的路径;
-jm:JobManager容器的内存(默认值:MB);
-nl:为YARN应用程序指定YARN节点标签;
-nm:在YARN上为应用程序设置自定义名称;
-q:显示可用的YARN资源(内存,内核);
-qu:指定YARN队列;
-s:指定TaskManager中slot的数量;
-st:以流模式启动Flink;
-tm:每个TaskManager容器的内存(默认值:MB);
-z:命名空间,用于为高可用性模式创建Zookeeper子路径;

去CDH上查看,第一个是正在运行,第二个是结束

去appliance id进到yarn的app页面

全文 >>

Yarn学习笔记——MR任务

1.hive sql提交到yarn上面执行之后,将会成为MR任务执行

正在运行的MR任务的application查看的url,不同类似的任务查看的url可能会不同,比如Spark,Flink等

1
2
http://xxxx:8088/cluster/app/application_158225xxxxx_0316

全文 >>

Yarn学习笔记——常用命令

1.yarn top,查看yarn上面的资源使用情况

2.队列使用状态

1
2
3
4
5
6
7
8
9
10
queue -status root.xxx_common
Queue Information :
Queue Name : root.xxx_common
State : RUNNING
Capacity : 100.0%
Current Capacity : 21.7%
Maximum Capacity : -100.0%
Default Node Label expression :
Accessible Node Labels :

3.查看yarn上运行的任务列表,如果集群有krb认证的话,需要先kinit,认证后可以看到所有正在运行的任务

1
2
yarn application -list

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):12
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_15771778xxxxx_0664 xx-flink-test Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:35437
application_15771778xxxxx_0663 xx-flink-debug Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-79:42443
application_15771778xxxxx_0641 xxx-flink Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:38067
application_15771778xxxxx_0182 common_flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-79:38583
application_15822552xxxxx_0275 testjar XXX-FLINK xxx root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:36751
application_15822552xxxxx_0259 flinksql XXX-FLINK hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-77:37127
application_15822552xxxxx_0026 kudu-test Apache Flink hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:43071
application_15822552xxxxx_0307 xxx_statistic XXX Flink xxx root.xxx_common RUNNING UNDEFINED 100% http://xxx:18000
application_15822552xxxxx_0308 xxx-statistic XXX Flink xxx root.xxx_common ACCEPTED UNDEFINED 0% N/A
application_15810489xxxxx_0003 xxx-flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:8081
application_15810489xxxxx_0184 common_flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:35659
application_15810489xxxxx_0154 Flink session cluster Apache Flink hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-80:38797

使用状态进行筛选

1
2
3
4
5
yarn application -list -appStates RUNNING
Total number of applications (application-types: [] and states: [RUNNING]):12
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_157717780xxxx_0664 xx-flink-test Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxxxx-xx:35437

4.查看任务状态信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yarn application -status application_1582255xxxx_0314
Application Report :
Application-Id : application_1582255xxxx_0314
Application-Name : select count(*) from tb1 (Stage-1)
Application-Type : MAPREDUCE
User : hive
Queue : root.xxxx_common
Start-Time : 1583822835423
Finish-Time : 1583822860082
Progress : 100%
State : FINISHED
Final-State : SUCCEEDED
Tracking-URL : http://xxx-xxxx-xx:19888/jobhistory/job/job_15822552xxxx_0314
RPC Port : 32829
AM Host : xxxx-xxxx-xx
Aggregate Resource Allocation : 162810 MB-seconds, 78 vcore-seconds
Log Aggregation Status : SUCCEEDED
Diagnostics :

全文 >>

Elasticsearch的Shard和Segment

Shard是什么?

在下面的文档中进行了介绍

1
2
https://www.elastic.co/guide/cn/elasticsearch/guide/current/kagillion-shards.html

1.一个分片的底层即为一个 Lucene 索引,会消耗一定文件句柄、内存、以及 CPU 运转。

全文 >>