tonglin0325的个人主页

yarn学习笔记——yarn api

参考:Yarn 监控 - 监控任务运行状态 (包括Spark,MR 所有在Yarn中运行的任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    //获取任务的applicationId
public static String getAppId(String jobName) throws IOException {

Configuration conf = new Configuration();
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hdfs@XXXX", "/home/xxxx/hdfs.keytab");

YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);

if (appStates.isEmpty()) {
appStates.add(YarnApplicationState.RUNNING);
appStates.add(YarnApplicationState.ACCEPTED);
appStates.add(YarnApplicationState.SUBMITTED);
}

List<ApplicationReport> appsReport = null;
try {
// 返回EnumSet<YarnApplicationState>中个人任务状态的所有任务
appsReport = client.getApplications(appStates);
} catch (YarnException | IOException e) {
e.printStackTrace();
}

assert appsReport != null;

for (ApplicationReport appReport : appsReport) {
System.out.println(appReport);
// 获取任务名
String jn = appReport.getName();
String applicationType = appReport.getApplicationType();
if (jn.equals(jobName)) { // &amp;&amp; "Apache Flink".equals(applicationType)) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return appReport.getApplicationId().toString();
}
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

// 根据任务的applicationId去获取任务的状态
public static YarnApplicationState getState(String appId) throws IOException {

Configuration conf = new Configuration();
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hdfs@XXXXX", "/home/xxxx/hdfs.keytab");

YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
YarnApplicationState yarnApplicationState = null;
try {
ApplicationReport applicationReport = client.getApplicationReport(applicationId);
yarnApplicationState = applicationReport.getYarnApplicationState();
} catch (YarnException | IOException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
return yarnApplicationState;
}

public static void main(String[] args) throws IOException, InterruptedException {
String state = getAppId("job_xxxxx");
System.out.println(state);
// System.out.println(state == YarnApplicationState.RUNNING);

}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
applicationId { id: 279 cluster_timestamp: 1620359479641 } 
user: "dl"
queue: "root.xxxx"
name: "xxx-flink"
host: "xxxxx"
rpc_port: 18000
client_to_am_token { identifier: "xxxx@XXXX" password: ";xxxxx" kind: "YARN_CLIENT_TOKEN" service: "" }
yarn_application_state: RUNNING
trackingUrl: "http://xxxxx:8088/proxy/application_xxxxx/"
diagnostics: ""
startTime: 1620391776339
finishTime: 0
final_application_status: APP_UNDEFINED
app_resource_Usage { num_used_containers: 4 num_reserved_containers: 0 used_resources { memory: 8192 virtual_cores: 7 } reserved_resources { memory: 0 virtual_cores: 0 } needed_resources { memory: 8192 virtual_cores: 7 } memory_seconds: 12703546778 vcore_seconds: 10855065 }
originalTrackingUrl: "http://xxxx:18000" currentApplicationAttemptId { application_id { id: 279 cluster_timestamp: 1620359479641 } attemptId: 1 } progress: 1.0
applicationType: "XXXX Flink"
log_aggregation_status: LOG_NOT_START