参考:Yarn 监控 - 监控任务运行状态 (包括Spark,MR 所有在Yarn中运行的任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| //获取任务的applicationId public static String getAppId(String jobName) throws IOException {
Configuration conf = new Configuration(); System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); conf.set("hadoop.security.authentication", "Kerberos"); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab("hdfs@XXXX", "/home/xxxx/hdfs.keytab");
YarnClient client = YarnClient.createYarnClient(); client.init(conf); client.start(); EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);
if (appStates.isEmpty()) { appStates.add(YarnApplicationState.RUNNING); appStates.add(YarnApplicationState.ACCEPTED); appStates.add(YarnApplicationState.SUBMITTED); }
List<ApplicationReport> appsReport = null; try { // 返回EnumSet<YarnApplicationState>中个人任务状态的所有任务 appsReport = client.getApplications(appStates); } catch (YarnException | IOException e) { e.printStackTrace(); }
assert appsReport != null;
for (ApplicationReport appReport : appsReport) { System.out.println(appReport); // 获取任务名 String jn = appReport.getName(); String applicationType = appReport.getApplicationType(); if (jn.equals(jobName)) { // && "Apache Flink".equals(applicationType)) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } return appReport.getApplicationId().toString(); } } try { client.close(); } catch (IOException e) { e.printStackTrace(); } return null; }
// 根据任务的applicationId去获取任务的状态 public static YarnApplicationState getState(String appId) throws IOException {
Configuration conf = new Configuration(); System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); conf.set("hadoop.security.authentication", "Kerberos"); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab("hdfs@XXXXX", "/home/xxxx/hdfs.keytab");
YarnClient client = YarnClient.createYarnClient(); client.init(conf); client.start(); ApplicationId applicationId = ConverterUtils.toApplicationId(appId); YarnApplicationState yarnApplicationState = null; try { ApplicationReport applicationReport = client.getApplicationReport(applicationId); yarnApplicationState = applicationReport.getYarnApplicationState(); } catch (YarnException | IOException e) { e.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } return yarnApplicationState; }
public static void main(String[] args) throws IOException, InterruptedException { String state = getAppId("job_xxxxx"); System.out.println(state); // System.out.println(state == YarnApplicationState.RUNNING);
}
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| applicationId { id: 279 cluster_timestamp: 1620359479641 } user: "dl" queue: "root.xxxx" name: "xxx-flink" host: "xxxxx" rpc_port: 18000 client_to_am_token { identifier: "xxxx@XXXX" password: ";xxxxx" kind: "YARN_CLIENT_TOKEN" service: "" } yarn_application_state: RUNNING trackingUrl: "http://xxxxx:8088/proxy/application_xxxxx/" diagnostics: "" startTime: 1620391776339 finishTime: 0 final_application_status: APP_UNDEFINED app_resource_Usage { num_used_containers: 4 num_reserved_containers: 0 used_resources { memory: 8192 virtual_cores: 7 } reserved_resources { memory: 0 virtual_cores: 0 } needed_resources { memory: 8192 virtual_cores: 7 } memory_seconds: 12703546778 vcore_seconds: 10855065 } originalTrackingUrl: "http://xxxx:18000" currentApplicationAttemptId { application_id { id: 279 cluster_timestamp: 1620359479641 } attemptId: 1 } progress: 1.0 applicationType: "XXXX Flink" log_aggregation_status: LOG_NOT_START
|