tonglin0325的个人主页

Filebeat采集文本文件内容发送到kafka

  1. 安装filebeat,选择tar包安装方式,下载并解压
1
2
3
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.0-linux-x86_64.tar.gz
tar -zxvf filebeat-7.7.0-linux-x86_64.tar.gz

  1. 修改配置文件
1
2
3
mv filebeat.yml filebeat.yml.bak
touch filebeat.yml

配置文件内容 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
logging.level: info
logging.to_files: true
logging.files:
path: /data/log_path/filebeat
name: filebeat
keepfiles: 7
permissions: 0644


filebeat.inputs:
- type: log
enabled: true
paths:
- /your_file_name
scan_frequency: 1s
backoff: 1s
max_backoff: 1s
ignore_older: 24h
close_inactive: 30m
close_timeout: 24h
clean_inactive: 720h
encoding: utf-8

output.kafka:
version: "2.0.0"
enabled: true
hosts: your_kafka_ip:tour_kafka_port
topic: your_kafka_topic # topic name
username: xxxxxx # kafka username
password: xxxxxx # kafka password
required_acks: 1
compression: gzip
max_message_bytes: 1000000
codec.format:
string: '%{[message]}'

  1. 启动脚本

 

Java观察者设计模式

在java.util包中提供了Observable类Observer接口,使用它们即可完成观察者模式

 

 

多个观察者都在关注着价格的变化,只要价格一有变化,则所有的观察者会立即有所行动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.Observable;
import java.util.Observer;

class House extends Observable{
private float price;

public House(float price) {
super();
this.price = price;
}

public float getPrice() {
return price;
}

public void setPrice(float price) {
super.setChanged(); //设置变化点
super.notifyObservers(price); //通知所有观察者价格改变
this.price = price;
}

@Override
public String toString() {
return "房子价格为:" + this.price;
}

}

class HouseObserver implements Observer{
private String name; //观察者的名字

public HouseObserver(String name) {
super();
this.name = name;
}

@Override
public void update(Observable o, Object arg) {
// TODO 自动生成的方法存根
if(arg instanceof Float){ //判断参数类型
System.out.println(this.name+"观察到价格改变为");
System.out.println(((Float)arg).floatValue());
}
}

}

public class Observer_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
House h = new House(1000000);
HouseObserver ho1 = new HouseObserver("购房者1");
HouseObserver ho2 = new HouseObserver("购房者2");
HouseObserver ho3 = new HouseObserver("购房者3");
h.addObserver(ho1); //加入观察者
h.addObserver(ho2);
h.addObserver(ho3);
System.out.println(h); //输出房子的价格
h.setPrice(666); //修改房子的价格
System.out.println(h); //输出房子的价格
}

}

 

SpringBoot学习笔记——动态代理

代理模式是一种设计模式,提供了对目标对象额外的访问方式,即通过代理对象访问目标对象,这样可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标对象的功能。

 

1.静态代理: 在编译时就已经实现,编译完成后代理类是一个实际的class文件

静态代理的方式:创建一个接口,然后创建被代理的类实现该接口并且实现该接口中的抽象方法。之后再创建一个代理类,同时使其也实现这个接口。在代理类中持有一个被代理对象的引用,而后在代理类方法中调用该对象的方法。

参考:什么是动态代理? 

2.动态代理: 在运行时动态生成的,即编译完成后没有实际的class文件,而是在运行时动态生成类字节码,并加载到JVM中

动态代理的方式:JDK动态代理 和 CGLIB动态代理

在springboot 2.x中默认使用的CGLIB动态代理,如果要强制使用JDK代理的话,需要添加配置

1
2
spring.aop.proxy-target-class=false

参考:springboot2.x默认使用的代理是cglib代理

 

全文 >>

ubuntu16.04安装haproxy

清华镜像站

1
2
https://mirrors.tuna.tsinghua.edu.cn/ubuntu/pool/main/h/haproxy/

下载haproxy

1
2
https://mirrors.tuna.tsinghua.edu.cn/ubuntu/pool/main/h/haproxy/haproxy_1.6.3-1ubuntu0.3_amd64.deb

安装haproxy

1
2
sudo dpkg -i ./haproxy_1.6.3-1ubuntu0.3_amd64.deb

编辑配置文件

1
2
vim /etc/haproxy/haproxy.cfg

其默认配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
global
log /dev/log local0
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon

# Default SSL material locations
ca-base /etc/ssl/certs
crt-base /etc/ssl/private

# Default ciphers to use on SSL-enabled listening sockets.
# For more information, see ciphers(1SSL). This list is from:
# https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS
ssl-default-bind-options no-sslv3

defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000
errorfile 400 /etc/haproxy/errors/400.http
errorfile 403 /etc/haproxy/errors/403.http
errorfile 408 /etc/haproxy/errors/408.http
errorfile 500 /etc/haproxy/errors/500.http
errorfile 502 /etc/haproxy/errors/502.http
errorfile 503 /etc/haproxy/errors/503.http
errorfile 504 /etc/haproxy/errors/504.http

全文 >>

yarn学习笔记——yarn api

参考:Yarn 监控 - 监控任务运行状态 (包括Spark,MR 所有在Yarn中运行的任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    //获取任务的applicationId
public static String getAppId(String jobName) throws IOException {

Configuration conf = new Configuration();
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hdfs@XXXX", "/home/xxxx/hdfs.keytab");

YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);

if (appStates.isEmpty()) {
appStates.add(YarnApplicationState.RUNNING);
appStates.add(YarnApplicationState.ACCEPTED);
appStates.add(YarnApplicationState.SUBMITTED);
}

List<ApplicationReport> appsReport = null;
try {
// 返回EnumSet<YarnApplicationState>中个人任务状态的所有任务
appsReport = client.getApplications(appStates);
} catch (YarnException | IOException e) {
e.printStackTrace();
}

assert appsReport != null;

for (ApplicationReport appReport : appsReport) {
System.out.println(appReport);
// 获取任务名
String jn = appReport.getName();
String applicationType = appReport.getApplicationType();
if (jn.equals(jobName)) { // &amp;&amp; "Apache Flink".equals(applicationType)) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return appReport.getApplicationId().toString();
}
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

// 根据任务的applicationId去获取任务的状态
public static YarnApplicationState getState(String appId) throws IOException {

Configuration conf = new Configuration();
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hdfs@XXXXX", "/home/xxxx/hdfs.keytab");

YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
YarnApplicationState yarnApplicationState = null;
try {
ApplicationReport applicationReport = client.getApplicationReport(applicationId);
yarnApplicationState = applicationReport.getYarnApplicationState();
} catch (YarnException | IOException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return yarnApplicationState;
}

public static void main(String[] args) throws IOException, InterruptedException {
String state = getAppId("job_xxxxx");
System.out.println(state);
// System.out.println(state == YarnApplicationState.RUNNING);

}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
applicationId { id: 279 cluster_timestamp: 1620359479641 } 
user: "dl"
queue: "root.xxxx"
name: "xxx-flink"
host: "xxxxx"
rpc_port: 18000
client_to_am_token { identifier: "xxxx@XXXX" password: ";xxxxx" kind: "YARN_CLIENT_TOKEN" service: "" }
yarn_application_state: RUNNING
trackingUrl: "http://xxxxx:8088/proxy/application_xxxxx/"
diagnostics: ""
startTime: 1620391776339
finishTime: 0
final_application_status: APP_UNDEFINED
app_resource_Usage { num_used_containers: 4 num_reserved_containers: 0 used_resources { memory: 8192 virtual_cores: 7 } reserved_resources { memory: 0 virtual_cores: 0 } needed_resources { memory: 8192 virtual_cores: 7 } memory_seconds: 12703546778 vcore_seconds: 10855065 }
originalTrackingUrl: "http://xxxx:18000" currentApplicationAttemptId { application_id { id: 279 cluster_timestamp: 1620359479641 } attemptId: 1 } progress: 1.0
applicationType: "XXXX Flink"
log_aggregation_status: LOG_NOT_START

  

 

Mybatis学习笔记——通用mapper

在使用mybatis-generator自动生成mapper代码的时候,对于基本的增删改查方法可以通过继承通用mapper的方式进行简化,参考:MyBatis 通用 Mapper  5. 高级用法

mapper接口,参考:Mapper 接口大全

1.依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!-- mybatis -->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>3.3.9</version>
</dependency>

2.由于需要使用增删改查的方法,所以通用的MyMapper类为

1
2
3
4
5
6
import tk.mybatis.mapper.common.Mapper;
import tk.mybatis.mapper.common.MySqlMapper;

public interface MyMapper<T> extends Mapper<T>, MySqlMapper<T> {
}

3.对于具体mapper,只主要继承MyMapper即可,如

1
2
public interface UserMapper extends MyMapper<User> {
}

对于UserMapper接口,需要添加 @Mapper 注解,或者使用 @MapperScan(basePackages = “com.example.demo.mapper”) 的方式,为其生成动态代理类

代理模式是一种设计模式,提供了对目标对象额外的访问方式,即通过代理对象访问目标对象,这样可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标对象的功能。

动态代理,参考:什么是动态代理? 

Java动态代理之InvocationHandler最简单的入门教程

全文 >>

SpringBoot学习笔记——spring security

Spring Security是提供了认证,鉴权以及其他的安全特性的java框架,下面是Spring Security的使用教程

1.引入依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

引入依赖用会发现请求所有的接口都会跳转到 /login,要求你进行账号密码的认证

 

其默认的用户是user,密码会在日志中打印出来,Using generated security password: xxxxxxxx

账号密码正确后,接口就可以正常请求,且一般情况下同一个电脑同一个浏览器下的session是共享的,比如同个浏览器下多个窗口的session id是相同的

如果想自定义认证的方式的话,可以通过继承 WebSecurityConfigurerAdapter 的方式,重写configure(HttpSecurity http) 方法

全文 >>

docker学习笔记——网络模式

查看容器的网络模式

1
2
3
4
5
6
7
8
9
10
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3f6822d8f262 confluentinc/cp-schema-registry:latest "/etc/confluent/dock&hellip;" 13 minutes ago Up 13 minutes schema-registry

docker inspect 3f6822d8f262 | grep -i "network"
"NetworkMode": "host",
"NetworkSettings": {
"Networks": {
"NetworkID": "5d40a7d178679339f87cc31965ba9a1c662c74ccea853945967d4303e4f9acc0",

docker总共有4种网络模式,从上到下隔离度下降:

1.Close容器,即none模式,运行在Close容器中的进程只能访问本地回环接口,隔离度最高

2.Bridge容器,即bridge模式,当容器中的进程需要访问外部网络的时候应该使用,且bridge是docker的default网络模式。

 

bridge容器拥有2个接口,一个是私有的本地回环接口,另一个是私有接口通过网桥连接到主机的其他容器。

3.Joined容器,

4.Open容器,即host模式,该模式在启动docker的时候加上 –net=host 参数,比如

1
2
docker run  --network=host xxxx:v1.0.0dev

该模式容器会占用宿主机的对应端口,且可以访问宿主机的host

全文 >>