tonglin0325的个人主页

Flink学习笔记——内存调优

flink内存分布

task manager

参考:Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

1.堆外内存不足:java.lang.OutOfMemoryError: Direct buffer memory

报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. <br />The direct memory can be allocated by user code or some of its dependencies. <br />In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. <br />The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. <br />In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:247)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
at java.nio.channels.Channels.writeFully(Channels.java:101)
at java.nio.channels.Channels.access$000(Channels.java:61)
at java.nio.channels.Channels$1.write(Channels.java:174)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.security.DigestOutputStream.write(DigestOutputStream.java:145)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.write(MultipartUploadOutputStream.java:172)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$0(SizeAwareFSDataOutputStream.java:58)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:124)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:55)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:404)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:439)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:99)
at org.apache.hudi.execution.ExplicitWriteHandler.closeOpenHandle(ExplicitWriteHandler.java:62)
at org.apache.hudi.execution.ExplicitWriteHandler.finish(ExplicitWriteHandler.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:41)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

可能需要调整的是taskmanager的内存参数, taskmanager.memory.task.off-heap.size 或者 taskmanager.memory.framework.off-heap.size,在启动flink session cluster的时候添加如下配置

需要注意的是,需要在启动session cluster的时候配置-D参数,在flink run的时候添加内存参数是无法生效的

1
2
/usr/lib/flink/bin/yarn-session.sh -s 1 -jm 51200 -tm 51200 -qu data -D taskmanager.memory.task.off-heap.size=4G -D taskmanager.memory.framework.off-heap.size=4G --detached

点到task manager的页面查看,配置的4G内存已经生效

这是由于flink off-heap size默认只有128M,需要进行调整,如下

参考:Flink 运行错误 java.lang.OutOfMemoryError: Direct buffer memory

其他调优:Flink性能调优

在flink cdc写hudi的场景下,建议使用BUCKET index type替换默认的FLINK STATE index type,FLINK STATE index type是in-memory的,十分消耗内存

参考:HUDI-0.11.0 BUCKET index on Flink 新特性试用

全文 >>

使用joda-time处理时间

引入joda-time

1
2
3
4
5
6
7
<!--jodatime-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10</version>
</dependency>

1.字符串转joda-time的DateTime

parse日期

1
2
3
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd");
DateTime dateTime = DateTime.parse(date, fmt);

parse时间戳

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dt = DateTime.parse(dateStr, format);

转换时区

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss yyyy").withLocale(Locale.ENGLISH);
DateTime dt = DateTime.parse(dateStr, format);

2.joda-time的DateTime转字符串

1
2
3
4
String date = dt.toString("yyyy-MM-dd");
String datetime = dt.toString("yyyy-MM-dd HH:mm:ss");
String datetime = dt.toString("yyyy-MM-dd HH:00:00"); // 整点

3.获取当前时间

1
2
DateTime now = new DateTime();

4.比较2个DateTime的时间差

1
2
Days.daysBetween(dateTime, now).getDays() > 7

5.jodatime添加时区

1
2
DateTime now = new DateTime(DateTimeZone.UTC);

Java泛型

泛型就是指在对象建立时不指定类中属性的具体类型,而由外部在声明及实例化对喜爱时指定类型。

在泛型的指定中无法指定基本数据类型的,必须设置成一个类,这样在设置一个数字时就必须使用包装类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Point<T>{		//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}
}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
// Point<Integer> p = new Point<Integer>(); //里面的var类型为Integer类型
// p.setVar(30); //设置数字,自动装箱
// System.out.println(p.getVar()*2); //计算结果,按数字取出
Point<String> p = new Point<String>(); //里面的var类型为Integer类型
p.setVar("张三"); //设置数字,自动装箱
System.out.println(p.getVar().length()); //计算结果,按数字取出
}

}

全文 >>

Java同步synchronized与死锁

多个线程要操作同一资源时就有可能出现资源的同步问题

同步就是指多个操作在同一个时间段内只能有一个线程进行,其他线程要等待此线程完成之后才可以继续执行。

解决资源共享的同步操作,可以使用同步代码块同步方法两种方式完成。

全文 >>

CDH学习笔记——角色组

1.对于机型不同的机器,可以通过角色组来进行统一归类管理

比如对于HDFS组件,有的机型的磁盘为12块,有的机型的磁盘为16块,那么可以通过角色组将配置一致的机器分到一起

在HDFS组件下,选择实例

全文 >>

SpringBoot学习笔记——校验

JSR-303提供了一些注解,将其放到属性上,可以限制这些属性的值。

参考:Spring MVC学习笔记——JSR303介绍及最佳实践

校验放在DTO层上,不要和数据库交互的model层混用

关于model,VO等的区别,参考:Spring MVC学习笔记——POJO和DispatcherServlet

如何赋值,参考:优雅的使用BeanUtils对List集合的操作

DTO和DO的转换,可以使用BeanUtils,参考:设计之道-controller层的设计

也可以使用ModelMapper,参考:Spring Boot DTO示例:实体到DTO的转换

如果使用的springboot版本大于2.3.x,需要额外引用依赖

1
2
3
4
5
6
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.1.Final</version>
</dependency>

参考:使用SpringBoot进行优雅的数据验证

定义dto层或者vo层,添加 @NotEmpty注解 和 @Size注解,并设置分组校验,即在Post请求或者Put请求的时候进行校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.example.demo.core.valid.Post;
import com.example.demo.core.valid.Put;
import com.example.demo.model.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.BeanUtils;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.Size;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDTO {

@NotEmpty(groups = Post.class, message = "注册时username字段不能为空")
@Size(groups = {Post.class, Put.class}, min = 3, max = 120)
private String username;

private String password;

public static User convert(UserDTO dto) {
User user = new User();
BeanUtils.copyProperties(dto, user);
return user;
}

public static UserDTO convertDTO(User user) {
UserDTO dto = new UserDTO();
BeanUtils.copyProperties(user, dto);
return dto;
}

}

定义Post分组接口

1
2
3
4
5
package com.example.demo.core.valid;

public interface Post {
}

定义Put分组接口

1
2
3
4
5
package com.example.demo.core.valid;

public interface Put {
}

设置全局异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler(MethodArgumentNotValidException.class)
public ControllerResponseT methodArgumentNotValidException(MethodArgumentNotValidException e) {
String message = ResultCode.METHOD_ARGUMENT_NOT_VALID.getMessage();
log.error("参数验证失败 => {}", e.getMessage());
BindingResult bindingResult = e.getBindingResult();
List<ObjectError> allErrors = bindingResult.getAllErrors();
return new ControllerResponseT<>(ResultCode.METHOD_ARGUMENT_NOT_VALID.getCode(), message, allErrors);
}

@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
@ExceptionHandler({SQLException.class, DataAccessException.class})
public ControllerResponseT databaseException(final Throwable e) {
String message = ResultCode.DATABASE_ERROR.getMessage();
log.error("数据库错误 => {}", e.getMessage());
return ControllerResponseT.ofFail(ResultCode.DATABASE_ERROR.getCode(), message, e.getMessage());
}

controller层

1
2
3
4
5
6
7
8
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType = "UserDTO", name = "userDTO", value = "用户", required = true)
})
@RequestMapping(path = "/user", method = RequestMethod.POST)
public ControllerResponseT create(@Validated({Post.class}) @RequestBody UserDTO userDTO) {
int result = userService.save(UserDTO.convert(userDTO));
return ControllerResponseT.ofSuccess("success");
}

如果参数验证错误,则接口返回结果如下

全文 >>