tonglin0325的个人主页

Zk学习笔记——连接Zookeeper

参考:从Paxos到Zookeeper分布式一致性原理和实践

使用的zk依赖是cdh5.16.2的3.4.5

1
2
3
4
5
6
7
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh5.16.2</version>
</dependency>

代码,其中CountDownLatch计数器参考:Java多线程——其他工具类CyclicBarrier、CountDownLatch和Exchange

初始化的计数器为1,当zk client连接到zk集群后,zk集群返回的状态为connected,计数器-1,触发InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.bigdata.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZkExample implements Watcher {

public static CountDownLatch connectedSemaphore = new CountDownLatch(1);

public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("master:2181", 5000, new ZkExample());
System.out.println(zk.getState());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
System.out.println("Zk session established");
}
}

@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
connectedSemaphore.countDown();
}
}
}

输出

 

小米手机卸载小爱同学

1.将系统进行root

手机的miui版本需要是开发版,并且进行了root,这样才能获取到卸载系统自带软件的权限

尤其是小爱同学这个语音助手,消耗了过多的系统资源,使得手机变得卡顿,所以建议卸载掉

像通过安全中心进行卸载的方式是不能完全卸载掉的,会随着系统后台再次进行安装,所以只能获取root权限后,使用root权限进行删除

如果需要获取root权限,就需要刷开发版的rom,下载ROM请去

1
2
https://xiaomirom.com/

2.在手机上安装termux app

这个app用于在手机上执行终端命令

1
2
https://github.com/termux/termux-app/releases

3.手机安装termux-adb

这个用于在termux上连上手机adb模式,开启调试

参考:https://github.com/MasterDevX/Termux-ADB

1
2
apt update &amp;&amp; apt install wget &amp;&amp; wget https://github.com/MasterDevX/Termux-ADB/raw/master/RemoveTools.sh &amp;&amp; bash RemoveTools.sh

全文 >>

Flink学习笔记——Flink on YARN

Flink集群部署的方式有以下几种,在本文中主要介绍Flink on yarn:

1
2
3
4
5
Yarn
Mesos
Docker/Kubernetes
Standalone

 

参考:

1
2
https://www.slideshare.net/tillrohrmann/redesigning-apache-flinks-distributed-architecture-flink-forward-2017

 

Flink on yarn有2种运行方式:

 

Flink架构

全文 >>

使用frp+ss访问内网机器服务

1.在使用frp进行内网穿透的基础上,在内网机器的frpc.ini配置中添加

1
2
3
4
5
6
[web]
type = tcp
local_ip = master
local_port = 内网端口
remote_port = 外网机器端口

启动

1
2
./frpc -c frpc.ini

2.在内网机器上启动ss-server

修改配置/etc/shadow$ocks-libev/config.json,必须是0.0.0.0

1
2
3
4
5
6
7
8
9
{
"server":"0.0.0.0",
"server_port":内网端口,
"local_port":1080,
"password":"密码",
"timeout":60,
"method":"chacha20-ietf-poly1305"
}

启动

1
2
ss-server -c /etc/shadow$ocks-libev/config.json

3.配置代理

全文 >>

使用frp进行内网穿透

1.前提:1台有公网ip的服务器(1核1G),1台在内网的服务器(16G)

2.在公网机器上安装frp,并启动frp server

下载并解压

1
2
wget https://github.com/fatedier/frp/releases/download/v0.33.0/frp_0.33.0_linux_amd64.tar.gz

配置文件frps.ini

1
2
3
4
[common]
bind_port = xxxx
token = ssssss

其中bind_port是用于和client端通信的;token是密码;vhost_http_port是当client端配置了web http的服务的时候,通过server访问的端口;vhost_https_port是当client端配置了web https的服务的时候,通过server访问的端口

启动

1
2
./frps -c frps.ini

全文 >>

CDH修改机器ip

如果机器ip变更的话,cdh将无法正常启动,需要修改两个地方的ip地址

1.数据库scm库的HOSTS表,将IP_ADDRESS字段的ip修改成变更后的ip地址

 

2.修改/etc/cloudera-scm-agent/config.ini中的ip地址

 

然后进行重启

 

Hive学习笔记——hive hook

Hive hook是hive的钩子函数,可以嵌入HQL执行的过程中运行,比如下面的这几种情况

参考

1
2
https://www.slideshare.net/julingks/apache-hive-hooksminwookim130813

有了Hook,可以实现例如非法SQL拦截,SQL收集和审计等功能,业界的案例可以参考Airbnb的reair

1
2
https://github.com/airbnb/reair

该项目中就使用了Hive的hook函数实现了一个Audit Log Hook,将提交到hiveserver2上的query写入到MySQL当中收集起来

1
2
https://github.com/airbnb/reair/blob/master/hive-hooks/src/main/java/com/airbnb/reair/hive/hooks/CliAuditLogHook.java

 

这些hook函数的hive sql运行过程中的执行顺序

全文 >>

Flink学习笔记——读写kafka

Flink的kafka connector文档

1
2
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html

Flink写入kafka时候需要实现序列化反序列化

部分代码参考了

1
2
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

以及

1
2
3
https://juejin.im/post/5d844d11e51d4561e0516bbd
https://developer.aliyun.com/article/686809

1.依赖,其中

flink-java提供了flink的java api,包括dataset执行环境,format,一些算子

1
2
https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java

flink-streaming-java提供了flink的java streaming api,包括stream执行环境,一些算子

1
2
https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api

flink-connector-kafka提供了kafka的连接器

1
2
https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kafka

全文 >>

Flink学习笔记——scala shell

Flink也和和spark-shell类似的交互式开发模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bin/start-scala-shell.sh yarn
Starting Flink Shell:
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.address, localhost
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.port, 6123
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.heap.size, 1024m
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.process.size, 1568m
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.numberOfTaskSlots, 1
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: parallelism.default, 1
20/03/14 14:34:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.execution.failover-strategy, region
20/03/14 14:34:07 INFO cli.FlinkYarnSessionCli: Found Yarn properties file under /tmp/.yarn-properties-lintong.
20/03/14 14:34:07 WARN cli.FlinkYarnSessionCli: The configuration directory ('/home/lintong/software/apache/flink-1.10.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.

Connecting to Flink cluster (host: localhost, port: 6123).


▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░

F L I N K - S C A L A - S H E L L

读文件

1
2
3
4
5
6
7
8
9
10
scala> val dataSet = benv.readTextFile("hdfs://master:8020/user/lintong/logs/test/test.log")
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.address, localhost
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.rpc.port, 6123
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.heap.size, 1024m
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.process.size, 1568m
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.numberOfTaskSlots, 1
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: parallelism.default, 1
20/03/14 14:49:07 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.execution.failover-strategy, region
dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@13e5b262

打印

1
2
3
4
5
6
7
8
9
10
11
scala> dataSet.print()
20/03/14 14:49:10 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
20/03/14 14:49:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/14 14:49:11 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.2.105:8032
20/03/14 14:49:11 INFO yarn.YarnClusterDescriptor: No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
20/03/14 14:49:11 INFO yarn.YarnClusterDescriptor: Found Web Interface master:36441 of application 'application_1584163852090_0002'.
1
2
3
4

退出

1
2
3
scala> :q
good bye ..