tonglin0325的个人主页

Java多线程——Executors和线程池

线程池的概念与Executors类的应用

**  1.创建固定大小的线程池**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package java_thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

/**
* @param args
*/
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// ExecutorService threadPool = Executors.newCachedThreadPool();
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
for(int i=1;i<=10;i++){
final int task = i;
threadPool.execute(new Runnable(){
@Override
public void run() {
for(int j=1;j<=4;j++){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is looping of " + j + " for task of " + task);
}
}
});
}
System.out.println("all of 10 tasks have committed! ");
//threadPool.shutdownNow();

// Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
// new Runnable(){
// @Override
// public void run() {
// System.out.println("bombing!");
//
// }},
// 6,
// 2,
// TimeUnit.SECONDS);
}

}

**  2.创建缓存线程池**

1
2
ExecutorService threadPool = Executors.newCachedThreadPool();

** 3.创建单一线程池**

1
2
ExecutorService threadPool = Executors.newSingleThreadExecutor();

全文 >>

Java多线程——线程范围内共享变量和ThreadLocal

多个线程访问共享对象和数据的方式

  1.如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package java_thread;

class MyThread_2 implements Runnable{
private int ticket = 5;

@Override
public void run() { //覆写Thread类中的run()方法
// TODO 自动生成的方法存根
for (int i=0;i<10;i++){
synchronized (this) { //设置需要同步的操作
if(ticket>0){
try{
Thread.sleep(300);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("卖票:ticket="+ticket--);
}
}
// this.sale(); //调用同步方法
}
}

// public synchronized void sale(){ //声明同步方法
// if(ticket>0){
// try{
// Thread.sleep(300);
// }catch(InterruptedException e){
// e.printStackTrace();
// }
// System.out.println("卖票:ticket="+ticket--);
// }
// }

}

public class Runnable_demo2 {

public static void main(String[] args) {
// TODO 自动生成的方法存根
MyThread_2 mt = new MyThread_2(); //实例化Runnable子类对象
Thread t1 = new Thread(mt); //实例化Thread类对象
Thread t2 = new Thread(mt); //实例化Thread类对象
Thread t3 = new Thread(mt); //实例化Thread类对象
t1.start(); //启动线程
t2.start(); //启动线程
t3.start(); //启动线程
}

}

全文 >>

clickhouse学习笔记——Go客户端连接clickhouse

1.创建clickhouse环境

安装clickhouse

参考:ubuntu16.04安装clickhouse

或者使用docker

参考:https://hub.docker.com/r/clickhouse/clickhouse-server

1
2
docker run -d -p 18123:8123 -p 19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server:23.8

使用datagrip连接

创建表和测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE default.my_first_table
(
user_id UInt32,
message String,
timestamp DateTime,
metric Float32
)
ENGINE = MergeTree()
PRIMARY KEY (user_id, timestamp);

INSERT INTO default.my_first_table (user_id, message, timestamp, metric) VALUES
(101, 'Hello, ClickHouse!', now(), -1.0 ),
(102, 'Insert a lot of rows per batch', yesterday(), 1.41421 ),
(102, 'Sort your data based on your commonly-used queries', today(), 2.718 ),
(101, 'Granules are the smallest chunks of data read', now() + 5, 3.14159 )

2.使用client连接clickhouse

golang客户端连接clickhouse,可以使用 clickhouse-go 这个库

参考:一文教你Go语言如何轻松使用ClickHouse

引入依赖

1
2
go get github.com/ClickHouse/clickhouse-go

导入clickhouse的driver和database/sql,也可以使用github.com/jmoiron/sqlx,参考:golang操作clickhouse使用入门

1
2
3
import "database/sql"
import _ "github.com/ClickHouse/clickhouse-go"

否则会报

1
2
sql: unknown driver "clickhouse" (forgotten import?)

创建连接

1
2
3
4
5
6
7
8
9
source := "tcp://localhost:19000?username=default&amp;password=&amp;database=default&amp;block_size=4096"
db, err := sql.Open("clickhouse", source)
if err != nil {
fmt.Println(err)
}
defer func() {
_ = db.Close()
}()

记得添加defer用于关闭连接

创建clickhouse表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
query := `
CREATE TABLE default.my_first_table
(
user_id UInt32,
message String,
timestamp DateTime,
metric Float32
)
ENGINE = MergeTree()
PRIMARY KEY (user_id, timestamp)
`
_, err = db.Exec(query)
if err != nil {
fmt.Println(err)
}

插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var arr [][]any
arr = append(arr, []any{101, "Hello, ClickHouse!", time.Now(), -1.0})
arr = append(arr, []any{102, "Insert a lot of rows per batch", time.Now().Add(-1), 1.41421})
arr = append(arr, []any{102, "Sort your data based on your commonly-used queries", time.Now().Add(1), 2.718})
arr = append(arr, []any{101, "Granules are the smallest chunks of data read", time.Now().Add(5), 3.14159})
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
query := `INSERT INTO default.my_first_table (user_id, message, timestamp, metric) VALUES (?, ?, ?, ?)`
stmt, err := tx.Prepare(query)
if err != nil {
log.Fatal(err)
}
for _, data := range arr {
_, err = stmt.Exec(data...)
if err != nil {
fmt.Println(err)
}
}
_ = tx.Commit()

查询数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 读取数据
type Data struct {
UserId int64 `db:"user_id"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
Metric float64 `db:"metric"`
}

query := "SELECT * FROM default.my_first_table"
rows, err := db.Query(query)
if err != nil {
fmt.Println(err)
}
var result []Data
for rows.Next() {
var data Data
err = rows.Scan(&amp;data.UserId, &amp;data.Message, &amp;data.Timestamp, &amp;data.Metric)
if err != nil {
fmt.Println(err)
}
result = append(result, data)
}
fmt.Println(result)

输出

1
2
[{101 Hello, ClickHouse! 2024-05-25 15:54:00 +0000 UTC -1} {101 Granules are the smallest chunks of data read 2024-05-25 15:54:00 +0000 UTC 3.14159} {102 Insert a lot of rows per batch 2024-05-25 15:54:00 +0000 UTC 1.41421} {102 Sort your data based on your commonly-used queries 2024-05-25 15:54:00 +0000 UTC 2.718}]

全文 >>

Amazon S3限流机制

当使用S3作为Amazon EMR的存储的时候,当写入的流量比较大的时候,有时会遇到性能瓶颈,报错如下

1
2
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate.

在如下的AWS文章中介绍,S3的性能不是按照bucket定义的,而是按照bucket的prefix,对于每个prefix,3500的PUT/COPY/POST/DELETE能力和5000的GET/HEAD能力,如果超过这个限制的话,就会被限流

关于prefix定义,参考文档:aws s3原理和常用命令

参考:从 Amazon EMR 和 AWS Glue 访问 Amazon S3 中数据的性能优化最佳实践

在咨询了AWS的工程师后,说上面的文档不是很准确,更具体的回答是:

1.对于一个刚创建的bucket,默认每个bucket有3500写+5000读的能力

2.当S3 bucket的流量增加后,S3会对应进行bucket的split partition,根据的规则是按照prefix逐位向后分割

意思是对于S3://bucket_name/[a-z,0-9][a-z,0-9]…./object的s3存储结构,prefix的第1个字符有a-z+0-9(26+10=36)种可能性,如果这36个字符出现的可能性是一样的话,则当整个bucket的流量很大的时候,bucket就会扩容到36个partition,从而获得36*(3500/5000)的能力

这也就是为什么当遇到S3性能问题的能力,AWS的工程师经常会建议使用增加随机前缀,或者分区倒排的方案,其他AWS文章也有提到:Amazon EMR实战心得浅谈

增加随机前缀(最建议,但不适用于数仓):对于S3://bucket_name/xxx/2022/10/01/object_name的路径,如果bucket的并发超过了3500/5000的限制后,则S3会进行扩容partition,这时S3://bucket_name/xxx/2022/10/01/和S3://bucket_name/xxx/2022/10/02/分split成2个partition,到了第二天02分区又会split成02和03分区,但是S3扩容是需要一定时间的,所以每天流量峰值的时候有可能会出现限流。

如果增加了随机前缀,路径则会变成S3://bucket_name/[a-z,0-9][a-z,0-9]…./xxx/2022/10/01/object_name,这来一是流量被打散成36份,不会在每天的prefix上形成单独的热点;二来在遇到每年/每月/每日路径更新的时候,也不会进行S3扩容,因为扩容只会发生在第一个字符[a-z,0-9]

分区倒排:对于S3://bucket_name/xxx/2022/10/01/object_name的路径,分区倒排之后,路径变成S3://bucket_name/xxx/01/10/2022/object_name,即变成日/月/年,如果S3://bucket_name/xxx/路径下的并发超过3500/5000,第一个月过后,整个bucket会split成30或者31份(每天扩容一次),到了第二个月的时候,流量就会继续循环打到这30或者31个partition上,从而不会每天进行扩容

上面介绍的S3自动split的原理,如果是手动split的话,就没有以上限制,就是给AWS对接的工程师提support case让他们后台调整,人工指定从第几个字符开始split,类似于预分区

全文 >>

Spring MVC学习笔记——完整的用户登录

1.搭建环境的第一步是导包,把下面这些包都导入工程中

/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/aop
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/apache-commons-logging
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/apache-log4j
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/bean-validator
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/dbcp
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/hibernate-3.6.8.
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/JSTL
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/mysql
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/pager
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/sitemesh
/media/common/工作/Ubuntu软件/SpringMVC_jar包整理/spring

手动导包也可以,不过不是很方便,推荐学习使用maven的pom.xml文件来导入jar包

整个系统的结构

表示层(JSP页面),一般包名是view

**

全文 >>

Spring MVC学习笔记——登录和异常处理

1.在WEN-INF文件夹下面,添加一个login.jsp文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>用户登录</title>
</head>
<body>
<form action="user/login" method="post">
用户名:<input type="text" name="username"/><br/>
用户密码:<input type="password" name="password"/><br/>
<input type="submit" value="用户登录"/>
</form>
</body>
</html>

全文 >>

Hive任务如何计算生成的map和reduce任务

在使用hive时候,需要关注hive任务所消耗的资源,否则可能会出现hive任务过于低效,或者把所查询的数据源拉胯的情况

1.查看当前hive所使用的引擎和配置

使用set语句可以查看当前hive的配置

1
2
set;

查看hive当前使用的engine

1
2
set hive.execution.engine;

全文 >>