tonglin0325的个人主页

Flink学习笔记——内存调优

flink内存分布#

task manager

参考:Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

1.堆外内存不足:java.lang.OutOfMemoryError: Direct buffer memory#

报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. <br />The direct memory can be allocated by user code or some of its dependencies. <br />In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. <br />The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. <br />In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:247)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
at java.nio.channels.Channels.writeFully(Channels.java:101)
at java.nio.channels.Channels.access$000(Channels.java:61)
at java.nio.channels.Channels$1.write(Channels.java:174)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.security.DigestOutputStream.write(DigestOutputStream.java:145)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.write(MultipartUploadOutputStream.java:172)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$0(SizeAwareFSDataOutputStream.java:58)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:124)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:55)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:404)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:439)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:99)
at org.apache.hudi.execution.ExplicitWriteHandler.closeOpenHandle(ExplicitWriteHandler.java:62)
at org.apache.hudi.execution.ExplicitWriteHandler.finish(ExplicitWriteHandler.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:41)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

可能需要调整的是taskmanager的内存参数, taskmanager.memory.task.off-heap.size 或者 taskmanager.memory.framework.off-heap.size,在启动flink session cluster的时候添加如下配置

需要注意的是,需要在启动session cluster的时候配置-D参数,在flink run的时候添加内存参数是无法生效的

1
2
/usr/lib/flink/bin/yarn-session.sh -s 1 -jm 51200 -tm 51200 -qu data -D taskmanager.memory.task.off-heap.size=4G -D taskmanager.memory.framework.off-heap.size=4G --detached

点到task manager的页面查看,配置的4G内存已经生效

这是由于flink off-heap size默认只有128M,需要进行调整,如下

参考:Flink 运行错误 java.lang.OutOfMemoryError: Direct buffer memory

其他调优:Flink性能调优

在flink cdc写hudi的场景下,建议使用BUCKET index type替换默认的FLINK STATE index type,FLINK STATE index type是in-memory的,十分消耗内存

参考:HUDI-0.11.0 BUCKET index on Flink 新特性试用

 

2.磁盘空间不足:No space left on device#

报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1_(10/25) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
at java.nio.channels.Channels.writeFully(Channels.java:101)
at java.nio.channels.Channels.access$000(Channels.java:61)
at java.nio.channels.Channels$1.write(Channels.java:174)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

解决方法:适当增加机器磁盘的空间

 

3.堆内存不足:Caused by: java.lang.OutOfMemoryError: Java heap space#

1.使用rocksdb作用flink任务的状态后端,flink默认使用的是in memory状态后端,十分消耗内存

参考:https://hudi.apache.org/cn/docs/0.9.0/flink-quick-start-guide/#%E5%86%85%E5%AD%98%E4%BC%98%E5%8C%96

2.其他一些由于内存oom导致的GC问题

参考:FlinkCDC-Hudi:Mysql数据实时入湖全攻略六:极限压测下炸出来的FlinkCDC-Hudi坑,真多