-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] [task-api] YarnApplicationManager Command Cannot Kill the Yarn Process #16268
Comments
Search before asking
What happenedDolphinScheduler Version 3.2.1
The root cause is that the shell file is executed by sh not bash private void execYarnKillCommand(String tenantCode, String commandFile,
String cmd) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
sb.append("source /etc/profile\n");
sb.append("\n\n");
sb.append(cmd);
File f = new File(commandFile);
if (!f.exists()) {
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}
String runCmd = String.format("%s %s", Constants.SH, commandFile);
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
log.info("kill cmd:{}", runCmd);
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
} After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
I tried start another Flink Task, create the kill command locally and run the command.
I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream What you expected to happen1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error How to reproduceCurrently, Flink Task has not implement tracking Yarn Application Status. @Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
super.handle(taskCallBack);
if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) ||
FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) {
trackApplicationStatus();
}
}
@Override
public void trackApplicationStatus() throws TaskException {
log.info("Flink Task Yarn Application Id is " + appIds);
YarnClient yarnClient = YarnClient.createYarnClient();
try {
initialYarnClient(yarnClient);
String[] splitAppIds = appIds.split("_");
ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(splitAppIds[1]),
Integer.parseInt(splitAppIds[2]));
boolean yarnRunningFlag = true;
while (yarnRunningFlag) {
ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
YarnApplicationState appState = appReport.getYarnApplicationState();
log.info("Yarn Application State is " + appState);
if (YarnApplicationState.FAILED.equals(appState)) {
yarnRunningFlag = false;
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
} else if (YarnApplicationState.FINISHED.equals(appState) ||
YarnApplicationState.KILLED.equals(appState)) {
yarnRunningFlag = false;
}
Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS);
}
} catch (YarnException | IOException | NullPointerException e) {
log.error("Failed to track application status", e);
throw new RuntimeException("Failed to track application status");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.info("The current yarn task has been interrupted", ex);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("The current yarn task has been interrupted", ex);
} finally {
try {
// Stop YarnClient
yarnClient.stop();
// Close YarnClient
yarnClient.close();
} catch (IOException e) {
log.error("Close Yarn Client Failed!", e);
}
}
}
private void initialYarnClient(YarnClient yarnClient) throws MalformedURLException {
YarnConfiguration conf = new YarnConfiguration();
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL());
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL());
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL());
yarnClient.init(conf);
yarnClient.start();
} After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task. Anything elseNo response Version3.2.x Are you willing to submit PR?
Code of Conduct
|
execYarnKillCommand is in ProcessUtils in previous branch, the execYarnKillCommand in ProcessUtils is correct |
You need to check |
By the way, do you have any idea about how dolphinscheduler tracking Yarn Application Status currently? |
If you are sure that the |
Can you find out if the |
Right now, ds will not track the yarn application status, in most of case we don't need to track, since task is using sync mode, only if after cluster failover, we need to track the yarn task status, we can call yarn REST API to do this, but this is not implement in ds. |
Is there any plan to support sync/async mode for YarnTask? I achieve the code to track yarn application status by yarn REST API in my own code base. |
|
Search before asking
What happened
DolphinScheduler Version 3.2.1
When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first.
YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn
The root cause is that the shell file is executed by sh not bash
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command
Need add "source /etc/profile" to load the PATH and execute yarn command
Change code like following
After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
However, there are still error in logs
I tried start another Flink Task, create the kill command locally and run the command.
The command success with output Stream
I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream
What you expected to happen
1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error
2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state.
How to reproduce
Currently, Flink Task has not implement tracking Yarn Application Status.
If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished.
If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job
However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users.
I add following code in FlinkTask to monitor the Yarn Application Status
After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task.
Anything else
No response
Version
3.2.x
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: