tonglin0325的个人主页

Spark学习笔记——spark listener

spark可以使用SparkListener API在spark运行的过程中监控spark任务当前的运行状态,参考:SparkListener监听使用方式及自定义的事件处理动作

编写 MySparkAppListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.bigdata.spark

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}

class MySparkAppListener extends SparkListener with Logging {

// 启动事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
val appId = applicationStart.appId
logInfo("spark job start => " + appId.get)
}

// 结束事件
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logInfo("spark job end => " + applicationEnd.time)
}
}

添加 spark.extraListeners 参数

1
2
3
4
5
6
val sparkSession = SparkSession.builder()
.master("local")
.config("spark.extraListeners", "com.bigdata.spark.MySparkAppListener")
.appName("spark session example")
.getOrCreate()

运行任务后就可以在日志当中看到对应的日志

1
2
3
4
21/12/27 23:13:46 INFO MySparkAppListener: spark job start => local-1640618026361

21/12/27 23:13:48 INFO MySparkAppListener: spark job end => 1640618028287

还有其他的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
abstract class SparkListener extends SparkListenerInterface {
//阶段完成时触发的事件
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

//阶段提交时触发的事件
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

//任务启动时触发的事件
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

//下载任务结果的事件
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

//任务结束的事件
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

//job启动的事件
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

//job结束的事件
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

//环境变量被更新的事件
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

//块管理被添加的事件
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

//取消rdd缓存的事件
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

//app启动的事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

//app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

override def onExecutorUnblacklisted(
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

override def onNodeBlacklisted(
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}