tonglin0325的个人主页

Flink学习笔记——WordCount

参考Flink官方example

1
2
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.xxx.xx.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/**
* Created by lintong on 20-3-13.
*/
public class WordCount {

public static class WordCountData {

public static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
}

// *************************************************************************
// PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

// get input data
DataStream<String> text = null;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);

// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}

// *************************************************************************
// USER FUNCTIONS
// *************************************************************************

/**
* Implements the string tokenizer that splits sentences into words as a
* user-defined FlatMapFunction. The function takes a line (String) and
* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
* Integer>}).
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

}

运行参数

 

 结果

全文 >>

CDH学习笔记——cloudera manager API

可以使用CM提供的api查询cdh集群的信息

1
2
http://cloudera.github.io/cm_api/

7.0.3的api文档

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/index.html

查询impala query的api

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/json_ApiImpalaQuery.html

比如

1
2
https://xxxx:7180/api/v9/clusters/dev-cdh/services/impala/impalaQueries?from=2020-03-10T06:26:01.927Z

支持的参数如图所示

查询yarn上query的api

1
2
https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/apidocs/resource_YarnApplicationsResource.html

全文 >>

Ubuntu16.04安装Kitematic

1.下载安装文件

1
2
https://github.com/docker/kitematic/releases

解压并安装

1
2
sudo dpkg -i ./Kitematic-0.17.10_amd64.deb

启动,然后可以启动容器

 

 并可以修改端口映射

如果kitematic的my images无法显示的话,那是由于版本问题,在0.17.3版本是正常的,更高的版本无法显示

全文 >>

Ubuntu16.04安装flink-1.10.0

本来想cdh集成flink,但是我的cdh版本为5.16.2,参考了下面的issue可能cdh版本太低,至少要cdh6

1
2
https://github.com/pkeropen/flink-parcel/issues

进行独立安装

1
2
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz

安装路径

1
2
/home/lintong/software/apache/flink-1.10.0

/etc/profile添加,并source /etc/profile

1
2
3
4
#flink
export FLINK_HOME=/home/lintong/software/apache/flink-1.10.0
export PATH=${FLINK_HOME}/bin:$PATH

下载flink-shaded-hadoop-2-uber-2.7.5-7.0.jar包,放到flink的lib目录下

1
2
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar

全文 >>

Yarn学习笔记——MR任务

1.hive sql提交到yarn上面执行之后,将会成为MR任务执行

正在运行的MR任务的application查看的url,不同类似的任务查看的url可能会不同,比如Spark,Flink等

1
2
http://xxxx:8088/cluster/app/application_158225xxxxx_0316

 

运行结束的MR任务的查看url

1
2
http://xxxx:19888/jobhistory/job/job_1582255xxxx_0316

 

具体hive sql的具体执行用户,sql内容等信息到配置进行查看

1
2
http://xxxx:19888/ws/v1/history/mapreduce/jobs/job_15822552xxxxx_0298/conf

全文 >>

Yarn学习笔记——常用命令

1.yarn top,查看yarn上面的资源使用情况

2.队列使用状态

1
2
3
4
5
6
7
8
9
10
queue -status root.xxx_common
Queue Information :
Queue Name : root.xxx_common
State : RUNNING
Capacity : 100.0%
Current Capacity : 21.7%
Maximum Capacity : -100.0%
Default Node Label expression :
Accessible Node Labels :

3.查看yarn上运行的任务列表,如果集群有krb认证的话,需要先kinit,认证后可以看到所有正在运行的任务

1
2
yarn application -list

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):12
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_15771778xxxxx_0664 xx-flink-test Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:35437
application_15771778xxxxx_0663 xx-flink-debug Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-79:42443
application_15771778xxxxx_0641 xxx-flink Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:38067
application_15771778xxxxx_0182 common_flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-79:38583
application_15822552xxxxx_0275 testjar XXX-FLINK xxx root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:36751
application_15822552xxxxx_0259 flinksql XXX-FLINK hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-77:37127
application_15822552xxxxx_0026 kudu-test Apache Flink hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:43071
application_15822552xxxxx_0307 xxx_statistic XXX Flink xxx root.xxx_common RUNNING UNDEFINED 100% http://xxx:18000
application_15822552xxxxx_0308 xxx-statistic XXX Flink xxx root.xxx_common ACCEPTED UNDEFINED 0% N/A
application_15810489xxxxx_0003 xxx-flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-78:8081
application_15810489xxxxx_0184 common_flink Apache Flink xx root.xxx_common RUNNING UNDEFINED 100% http://xxx-76:35659
application_15810489xxxxx_0154 Flink session cluster Apache Flink hdfs root.xxx_common RUNNING UNDEFINED 100% http://xxx-80:38797

使用状态进行筛选

1
2
3
4
5
yarn application -list -appStates RUNNING
Total number of applications (application-types: [] and states: [RUNNING]):12
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_157717780xxxx_0664 xx-flink-test Apache Flink xxx-xx root.xxx_common RUNNING UNDEFINED 100% http://xxxxx-xx:35437

全文 >>

Elasticsearch的Shard和Segment

Shard是什么?

在下面的文档中进行了介绍

1
2
https://www.elastic.co/guide/cn/elasticsearch/guide/current/kagillion-shards.html

1.一个分片的底层即为一个 Lucene 索引,会消耗一定文件句柄、内存、以及 CPU 运转。    

2.每一个搜索请求都需要命中索引中的每一个分片,如果每一个分片都处于不同的节点还好, 但如果多个分片都需要在同一个节点上竞争使用相同的资源就有些糟糕了。    

3.用于计算相关度的词项统计信息是基于分片的。如果有许多分片,每一个都只有很少的数据会导致很低的相关度。

 

在官方文档中介绍了clusters, nodes, and shards的关系

1
2
https://www.elastic.co/guide/en/elasticsearch/reference/current/scalability.html

1.一个Es的索引实际上是一个或者多个的物理shard的组合

2.Shard有两种:一种是primaries and replicas,即主分片和副本分片。索引中的每一个文档都属于一个主分片,副本分片是主分片的拷贝

全文 >>

mac使用karabiner-elements修改键位

在最新的mac catalina系统中,已经从karabiner更名为karabiner-elements,安装的版本为Karabiner-Elements-12.9.0

下载地址:https://pqrs.org/osx/karabiner/

 

安装后会有两个应用,一个是Karabiner-Elements,一个是Karabiner-EventViewer,修改键位使用的是Karabiner-Elements

安装的时候会提示添加权限

 

 

 

安装后可以在下面的网址导入你想要修改的组合键方案,组合键的是complex_modifications,单个键是simple_modifications

全文 >>

mac修改brew源

参考:https://juejin.im/post/5daec26a51882575d50cd0aa

1.查看brew当前源

1
2
3
4
git -C "$(brew --repo)" remote -v
origin https://github.com/Homebrew/brew (fetch)
origin https://github.com/Homebrew/brew (push)

2.改成清华的源

1
2
3
4
5
git -C "$(brew --repo)" remote set-url origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git
git -C "$(brew --repo homebrew/core)" remote set-url origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-core.git
git -C "$(brew --repo homebrew/cask)" remote set-url origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-cask.git
brew update

已经修改成清华的源

1
2
3
4
git -C "$(brew --repo)" remote -v
origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git (fetch)
origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git (push)

或者改成中科大的源

1
2
3
4
5
6
7
8
9
# 替换 Homebrew
git -C "$(brew --repo)" remote set-url origin https://mirrors.ustc.edu.cn/brew.git
# 替换 Homebrew Core
git -C "$(brew --repo homebrew/core)" remote set-url origin https://mirrors.ustc.edu.cn/homebrew-core.git
# 替换 Homebrew Cask
git -C "$(brew --repo homebrew/cask)" remote set-url origin https://mirrors.ustc.edu.cn/homebrew-cask.git
# 更新
brew update

如果想还原

全文 >>

mac环境变量不生效问题

1.在 ~/.zshrc 中添加

1
2
source ~/.bash_profile 

参考:https://blog.csdn.net/qq_18505715/article/details/83276208

 

2.比如mac的git命令补全不生效,可以参考如下文章添加

https://blog.csdn.net/WinWill2012/article/details/71774461

1
2
source ~/.git-completion.bash

 

3.调整mac终端颜色和全路径显示

1
2
3
4
5
export PS1="%n@%m %0~ $ "

export CLICOLOR=1
export LSCOLORS=ExFxBxDxCxegedabagacad
alias ls='ls -GFh'

全文 >>