tonglin0325的个人主页

Flume的监控参数

参考 flume的http监控参数说明

 

普通的flume启动命令

1
2
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console

日志信息在终端输出,只有去掉这个参数,日志才能在log4j和logback中输出

1
2
-Dflume.root.logger=INFO,console

如果要加上http监控的话

1
2
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

 即加上参数,flume.monitoring.type=http 指定了Reporting的方式为http,flume.monitoring.port 指定了http服务的端口号

1
2
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

 访问

1
2
http://localhost:34545/metrics

 

参数说明:

(1)、SOURCE#

SOURCE作为flume的数据源组件,所有收集日志的第一个到达的地方,它的监控信息非常重要。通过监控我们能够得到的监控数据有这些:

KafkaEventGetTimer

AppendBatchAcceptedCount(追加到channel中的批数量) 速率

EventAcceptedCount(成功放入channel的event数量) 速率

AppendReceivedCount(source追加目前收到的数量) 速率

StartTime(组件开始时间)

AppendBatchReceivedCount(source端刚刚追加的批数量) 速率

KafkaCommitTimer

EventReceivedCount(source端成功收到的event数量) 速率

Type(组件类型)

AppendAcceptedCount(放入channel的event数量) 速率

OpenConnectionCount(打开的连接数)

KafkaEmptyCount

StopTime(组件停止时间)

当然这些只是flume监控源码中已经自带的监控元素,如果你需要其他的监控信息,例如ip、端口号等,有两种方法,第一个,修改监控源码,添加你需要的监控元素,这种方法只是在原有代码基础上,添加一些满足自己需求的监控元素,比较简单,但灵活性不足;第二个就是自定义监控组件,这种方法是在原有监控框架中,自己实现自己的监控组件,这样可以达到完全满足自己需求,且灵活性很高。至于这两种方法如何操作,在后面Flume监控如何实现有讨论到。

同理CHANNEL、SINK这两个组件的监控也可以使用这两种方法来添加自己想要的监控元素。

(2)、CHANNEL#

CHANNEL是flume的一个通道组件,对数据有一个缓存的作用。能够得到的数据:

ChannelCapacity(通道容量)

ChannelFillPercentage(通道使用比例)

Type(组件类型)

ChannelSize(目前在channel中的event数量)

EventTakeSuccessCount(从channel中成功取走的event数量) 速率

EventTakeAttemptCount(尝试从channel中取走event的次数) 速率

StartTime(组件开始时间)

EventPutAttemptCount(尝试放入将event放入channel的次数) 速率

EventPutSuccessCount(成功放入channel的event数量) 速率

StopTime(组件停止时间)

(3)、SINK#

SINK是数据即将离开flume的最后一个组件,它从channel中取走数据,然后发送到缓存系统或者持久化数据库。能得到数据:

ConnectionCreatedCount(创建连接数) 速率

BatchCompleteCount(完成的批数量)  速率

BatchEmptyCount(批量取空的数量,空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多) 速率

EventDrainSuccessCount(成功发送event的数量) 速率

StartTime(组件开始时间)

BatchUnderflowCount(正处于批量处理的batch数)等。  速率

ConnectionFailedCount(连接失败数) 速率

ConnectionClosedCount(关闭连接数量) 速率

Type(组件类型)

RollbackCount

EventDrainAttemptCount(尝试提交的event数量) 速率

KafkaEventSendTimer

StopTime(组件停止时间)

 

在实际生产环境中,由于数据量比较大(Kafka中导入200M左右的数据),Flume有时候会遇到下面oom问题

问题1

1
2
Exception in thread "PollableSourceRunner-KafkaSource-r1" java.lang.OutOfMemoryError: GC overhead limit exceeded

或者

1
2
Exception in thread "PollableSourceRunner-KafkaSource-r1" java.lang.OutOfMemoryError: Java heap space

 这是由于flume启动时的默认最大的堆内存大小是20M

解决方法:在flume的基础配置文件conf下的flume-env.sh中添加

1
2
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

问题2

1
2
3
13:54:27.213 ERROR org.apache.flume.source.kafka.KafkaSource:317 - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

 flume的properties文件中添加

1
2
3
agent.channels.c1.capacity = 1000000 #改大一点
agent.channels.c1.keep-alive = 60