tonglin0325的个人主页

上传snapshots jar包到nexus仓库

在nexus界面上可以手动上传release和hosted的包,但是无法手动上传snapshots的包

 

 

 需要使用 mvn deploy 来上传snapshots包,比如

1
2
mvn deploy:deploy-file -DgroupId=org.apache.impala -DartifactId=impala-frontend -Dversion=0.1-SNAPSHOT -Dpackaging=jar -Dfile=/home/lintong/下载/impala-frontend-0.1-SNAPSHOT.jar -Durl=http://ip:8081/nexus/repository/maven-snapshots/ -DrepositoryId=nexus-snapshots

其中

-Durl 指定的是仓库的URL地址

 

-DrepositoryId 指的是maven settings.xml中的repository的id

注意中的账号密码的id需要和中的id对的上

全文 >>

MySQL学习笔记——binlog

1.docker部署MySQL

amd64的机器可以使用centos的MySQL5.7的镜像:https://hub.docker.com/r/centos/mysql-57-centos7/

arm64和amd64的机器也可以使用MySQL8.0的镜像:https://hub.docker.com/layers/library/mysql/8.0.29/images/sha256-44f98f4dd825a945d2a6a4b7b2f14127b5d07c5aaa07d9d232c2b58936fb76dc

启动MySQL5.7的容器

1
2
docker run --name mysqltest -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.44

启动MySQL8.0的容器

1
2
docker run --name mysqltest -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.29

如果想指定mysql配置和data挂载路径,可以先进入容器中将mysql的配置先拷贝出来

进入容器查看MySQL的配置路径

1
2
3
4
sh-4.4# mysql --help | grep my.cnf
order of preference, my.cnf, $MYSQL_TCP_PORT,
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cnf

参考:Docker安装MySQL 并挂载数据及配置文件,设置远程访问权限

全文 >>

Flink学习笔记——内存调优

flink内存分布

task manager

参考:Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

1.堆外内存不足:java.lang.OutOfMemoryError: Direct buffer memory

报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. <br />The direct memory can be allocated by user code or some of its dependencies. <br />In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. <br />The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. <br />In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:247)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
at java.nio.channels.Channels.writeFully(Channels.java:101)
at java.nio.channels.Channels.access$000(Channels.java:61)
at java.nio.channels.Channels$1.write(Channels.java:174)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.security.DigestOutputStream.write(DigestOutputStream.java:145)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.write(MultipartUploadOutputStream.java:172)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$0(SizeAwareFSDataOutputStream.java:58)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:124)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:55)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:404)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:439)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:99)
at org.apache.hudi.execution.ExplicitWriteHandler.closeOpenHandle(ExplicitWriteHandler.java:62)
at org.apache.hudi.execution.ExplicitWriteHandler.finish(ExplicitWriteHandler.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:41)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

可能需要调整的是taskmanager的内存参数, taskmanager.memory.task.off-heap.size 或者 taskmanager.memory.framework.off-heap.size,在启动flink session cluster的时候添加如下配置

需要注意的是,需要在启动session cluster的时候配置-D参数,在flink run的时候添加内存参数是无法生效的

1
2
/usr/lib/flink/bin/yarn-session.sh -s 1 -jm 51200 -tm 51200 -qu data -D taskmanager.memory.task.off-heap.size=4G -D taskmanager.memory.framework.off-heap.size=4G --detached

点到task manager的页面查看,配置的4G内存已经生效

全文 >>

使用joda-time处理时间

引入joda-time

1
2
3
4
5
6
7
<!--jodatime-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10</version>
</dependency>

1.字符串转joda-time的DateTime

parse日期

1
2
3
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd");
DateTime dateTime = DateTime.parse(date, fmt);

parse时间戳

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dt = DateTime.parse(dateStr, format);

转换时区

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss yyyy").withLocale(Locale.ENGLISH);
DateTime dt = DateTime.parse(dateStr, format);

2.joda-time的DateTime转字符串

全文 >>

Java泛型

泛型就是指在对象建立时不指定类中属性的具体类型,而由外部在声明及实例化对喜爱时指定类型。

在泛型的指定中无法指定基本数据类型的,必须设置成一个类,这样在设置一个数字时就必须使用包装类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Point<T>{		//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}
}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
// Point<Integer> p = new Point<Integer>(); //里面的var类型为Integer类型
// p.setVar(30); //设置数字,自动装箱
// System.out.println(p.getVar()*2); //计算结果,按数字取出
Point<String> p = new Point<String>(); //里面的var类型为Integer类型
p.setVar("张三"); //设置数字,自动装箱
System.out.println(p.getVar().length()); //计算结果,按数字取出
}

}

 

泛型的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Point<T>{			//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public Point(T var) { //构造方法
super();
this.var = var;
}

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}

}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
// Point<Integer> p = new Point<Integer>(); //里面的var类型为Integer类型
// p.setVar(30); //设置数字,自动装箱
// System.out.println(p.getVar()*2); //计算结果,按数字取出

// Point<String> p = new Point<String>(); //里面的var类型为Integer类型
// p.setVar("张三"); //设置数字,自动装箱
// System.out.println(p.getVar().length()); //计算结果,按数字取出

Point<String> p = new Point<String>("张三");
System.out.println("内容:"+p.getVar());
}

}

 

指定多个泛型类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Point<T>{		//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public Point(T var) { //构造方法
super();
this.var = var;
}

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}

}

class Notepad<K,V>{ //此处T可以是任意的标识符号,T是type的简称
private K key; //此变量的类型由外部决定
private V value; //此变量的类型由外部决定

public Notepad(K key, V value) { //构造方法
super();
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}

}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
// Point<Integer> p = newdd Point<Integer>(); //里面的var类型为Integer类型
// p.setVar(30); //设置数字,自动装箱
// System.out.println(p.getVar()*2); //计算结果,按数字取出

// Point<String> p = new Point<String>(); //里面的var类型为Integer类型
// p.setVar("张三"); //设置数字,自动装箱
// System.out.println(p.getVar().length()); //计算结果,按数字取出

// Point<String> p = new Point<String>("张三");
// System.out.println("内容:"+p.getVar());

Notepad<String,Integer> t = new Notepad<String,Integer>("张三",18);
System.out.println(t.getKey());
System.out.println(t.getValue());

}
}

 

通配符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Point<T>{								//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public Point(T var) { //构造方法
super();
this.var = var;
}

public String toString(){ //覆写Object类中的toString()方法
return this.var.toString();
}

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}


}

class Notepad<K,V>{ //此处T可以是任意的标识符号,T是type的简称
private K key; //此变量的类型由外部决定
private V value; //此变量的类型由外部决定

public Notepad(K key, V value) { //构造方法
super();
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}

}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根

Point<String> p = new Point<String>("张三");
fun(p);
}

public static void fun(Point<?> point){ //使用泛型接收Point的对象
System.out.println("内容:" + point);
}

}

 

全文 >>

Java同步synchronized与死锁

多个线程要操作同一资源时就有可能出现资源的同步问题

同步就是指多个操作在同一个时间段内只能有一个线程进行,其他线程要等待此线程完成之后才可以继续执行。

解决资源共享的同步操作,可以使用同步代码块同步方法两种方式完成。

 

<1>同步代码块

所谓代码块就是指使用“{}”括起来的一段代码,根据其位置和声明的不同,可以分为普通代码块、构造块、静态块3种,如果在代码块上加上synchronized关键字,则此代码块就称为同步代码块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package java_thread;

class MyThread_2 implements Runnable{
private int ticket = 5;

@Override
public void run() { //覆写Thread类中的run()方法
// TODO 自动生成的方法存根
for (int i=0;i<10;i++){
synchronized (this) { //设置需要同步的操作
if(ticket>0){
try{
Thread.sleep(300);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("卖票:ticket="+ticket--);
}
}
// this.sale(); //调用同步方法
}
}

}

public class Runnable_demo2 {

public static void main(String[] args) {
// TODO 自动生成的方法存根
MyThread_2 mt = new MyThread_2(); //实例化Runnable子类对象
Thread t1 = new Thread(mt); //实例化Thread类对象
Thread t2 = new Thread(mt); //实例化Thread类对象
Thread t3 = new Thread(mt); //实例化Thread类对象
t1.start(); //启动线程
t2.start(); //启动线程
t3.start(); //启动线程
}

}

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package java_thread;

class Output{
public void output(String name){
int len = name.length();
synchronized(this){ //不能使用name,因为&ldquo;输出1&rdquo;和"输出2"两个字符串不是同一个对象
for(int i=0;i<len;i++){
System.out.print(name.charAt(i));
}
System.out.println();
}
}
}


public class Huchi {

private void init(){
final Output outputer = new Output();
//线程1
new Thread(new Runnable(){
@Override
public void run() { //覆写Thread类中的run()方法
while(true){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
//new Output().output("输出1"); //也不能new对象,new对象的话,this就不代表同一个对象了
outputer.output("输出1");
}
}
}).start();

//线程2
new Thread(new Runnable(){
@Override
public void run() { //覆写Thread类中的run()方法
while(true){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
//new Output().output("输出1"); //也不能new对象,new对象的话,this就不代表同一个对象了
outputer.output("输出2");
}
}
}).start();

}

public static void main(String[] args) {
// TODO 自动生成的方法存根
new Huchi().init();
}

}

 

全文 >>

CDH学习笔记——角色组

1.对于机型不同的机器,可以通过角色组来进行统一归类管理

比如对于HDFS组件,有的机型的磁盘为12块,有的机型的磁盘为16块,那么可以通过角色组将配置一致的机器分到一起

在HDFS组件下,选择实例

 

再点击角色组,可以选择创建角色组

其中DataNode Default Group有100台,每台有12块磁盘

其中DataNode Group 1有50台,每台有16块磁盘

此时在 dfs.datanode.data.dir 参数可以分别对2个角色组进行配置,default组有12块盘,group 1有16块盘

全文 >>