Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [task-api] YarnApplicationManager Command Cannot Kill the Yarn Process #16268

Open
3 tasks done
lanxing2 opened this issue Jul 3, 2024 · 9 comments
Open
3 tasks done
Labels
bug Something isn't working Waiting for user feedback Waiting for feedback from issue/PR author

Comments

@lanxing2
Copy link
Contributor

lanxing2 commented Jul 3, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

DolphinScheduler Version 3.2.1
When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first.
YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn

[ERROR] 2024-07-03 10:16:53.690 +0800 - Kill yarn application [[application_1714114694986_0041]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令

	at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
	at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
	at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
	at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
	at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 10:16:53.691 +0800 - Cancel application failed: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令

The root cause is that the shell file is executed by sh not bash
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command
Need add "source /etc/profile" to load the PATH and execute yarn command
Change code like following

    private void execYarnKillCommand(String tenantCode, String commandFile,
                                     String cmd) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("#!/bin/sh\n");
        sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
        sb.append("cd $BASEDIR\n");
        sb.append("source /etc/profile\n");
        sb.append("\n\n");
        sb.append(cmd);

        File f = new File(commandFile);

        if (!f.exists()) {
            org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
                    StandardCharsets.UTF_8);
        }

        String runCmd = String.format("%s %s", Constants.SH, commandFile);
        runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
        log.info("kill cmd:{}", runCmd);
        org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
    }

After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
However, there are still error in logs

[ERROR] 2024-07-03 15:10:04.875 +0800 - Kill yarn application [[application_1714114694986_0057]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
2024-07-03 15:10:04,863 INFO impl.YarnClientImpl: Killed application application_1714114694986_0057

	at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
	at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
	at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
	at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
	at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 15:10:04.876 +0800 - Cancel application failed: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032

I tried start another Flink Task, create the kill command locally and run the command.
The command success with output Stream

2024-07-03 15:37:18,381 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
Killing application application_1714114694986_0059
2024-07-03 15:37:18,883 INFO impl.YarnClientImpl: Killed application application_1714114694986_0059

I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream

What you expected to happen

1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error
2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state.

How to reproduce

Currently, Flink Task has not implement tracking Yarn Application Status.
If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished.
If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job
However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users.
I add following code in FlinkTask to monitor the Yarn Application Status

    @Override
    public void handle(TaskCallBack taskCallBack) throws TaskException {
        super.handle(taskCallBack);
        if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) ||
                FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) {
            trackApplicationStatus();
        }
    }

    @Override
    public void trackApplicationStatus() throws TaskException {
        log.info("Flink Task Yarn Application Id is " + appIds);
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            initialYarnClient(yarnClient);
            String[] splitAppIds = appIds.split("_");
            ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(splitAppIds[1]),
                    Integer.parseInt(splitAppIds[2]));
            boolean yarnRunningFlag = true;
            while (yarnRunningFlag) {
                ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
                YarnApplicationState appState = appReport.getYarnApplicationState();
                log.info("Yarn Application State is " + appState);
                if (YarnApplicationState.FAILED.equals(appState)) {
                    yarnRunningFlag = false;
                    setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
                } else if (YarnApplicationState.FINISHED.equals(appState) ||
                        YarnApplicationState.KILLED.equals(appState)) {
                    yarnRunningFlag = false;
                }
                Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS);
            }
        } catch (YarnException | IOException | NullPointerException e) {
            log.error("Failed to track application status", e);
            throw new RuntimeException("Failed to track application status");
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            log.info("The current yarn task has been interrupted", ex);
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
            throw new TaskException("The current yarn task has been interrupted", ex);
        } finally {
            try {
                // Stop YarnClient
                yarnClient.stop();
                // Close YarnClient
                yarnClient.close();
            } catch (IOException e) {
                log.error("Close Yarn Client Failed!", e);
            }
        }

    }

    private void initialYarnClient(YarnClient yarnClient) throws MalformedURLException {
        YarnConfiguration conf = new YarnConfiguration();
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL());
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL());
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL());
        yarnClient.init(conf);
        yarnClient.start();
    }

After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task.

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@lanxing2 lanxing2 added bug Something isn't working Waiting for reply Waiting for reply labels Jul 3, 2024
Copy link

github-actions bot commented Jul 3, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

DolphinScheduler Version 3.2.1
When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first.
YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn

[ERROR] 2024-07-03 10:16:53.690 +0800 - Kill yarn application [[application_1714114694986_0041]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令

	at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
	at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
	at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
	at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
	at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 10:16:53.691 +0800 - Cancel application failed: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令

The root cause is that the shell file is executed by sh not bash
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command
Need add "source /etc/profile" to load the PATH and execute yarn command
Change code like following

    private void execYarnKillCommand(String tenantCode, String commandFile,
                                     String cmd) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("#!/bin/sh\n");
        sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
        sb.append("cd $BASEDIR\n");
        sb.append("source /etc/profile\n");
        sb.append("\n\n");
        sb.append(cmd);

        File f = new File(commandFile);

        if (!f.exists()) {
            org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
                    StandardCharsets.UTF_8);
        }

        String runCmd = String.format("%s %s", Constants.SH, commandFile);
        runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
        log.info("kill cmd:{}", runCmd);
        org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
    }

After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
However, there are still error in logs

[ERROR] 2024-07-03 15:10:04.875 +0800 - Kill yarn application [[application_1714114694986_0057]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
2024-07-03 15:10:04,863 INFO impl.YarnClientImpl: Killed application application_1714114694986_0057

	at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
	at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
	at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
	at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
	at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
	at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
	at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
	at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
	at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 15:10:04.876 +0800 - Cancel application failed: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032

I tried start another Flink Task, create the kill command locally and run the command.
The command success with output Stream

2024-07-03 15:37:18,381 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
Killing application application_1714114694986_0059
2024-07-03 15:37:18,883 INFO impl.YarnClientImpl: Killed application application_1714114694986_0059

I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream

What you expected to happen

1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error
2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state.

How to reproduce

Currently, Flink Task has not implement tracking Yarn Application Status.
If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished.
If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job
However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users.
I add following code in FlinkTask to monitor the Yarn Application Status

    @Override
    public void handle(TaskCallBack taskCallBack) throws TaskException {
        super.handle(taskCallBack);
        if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) ||
                FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) {
            trackApplicationStatus();
        }
    }

    @Override
    public void trackApplicationStatus() throws TaskException {
        log.info("Flink Task Yarn Application Id is " + appIds);
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            initialYarnClient(yarnClient);
            String[] splitAppIds = appIds.split("_");
            ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(splitAppIds[1]),
                    Integer.parseInt(splitAppIds[2]));
            boolean yarnRunningFlag = true;
            while (yarnRunningFlag) {
                ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
                YarnApplicationState appState = appReport.getYarnApplicationState();
                log.info("Yarn Application State is " + appState);
                if (YarnApplicationState.FAILED.equals(appState)) {
                    yarnRunningFlag = false;
                    setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
                } else if (YarnApplicationState.FINISHED.equals(appState) ||
                        YarnApplicationState.KILLED.equals(appState)) {
                    yarnRunningFlag = false;
                }
                Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS);
            }
        } catch (YarnException | IOException | NullPointerException e) {
            log.error("Failed to track application status", e);
            throw new RuntimeException("Failed to track application status");
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            log.info("The current yarn task has been interrupted", ex);
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
            throw new TaskException("The current yarn task has been interrupted", ex);
        } finally {
            try {
                // Stop YarnClient
                yarnClient.stop();
                // Close YarnClient
                yarnClient.close();
            } catch (IOException e) {
                log.error("Close Yarn Client Failed!", e);
            }
        }

    }

    private void initialYarnClient(YarnClient yarnClient) throws MalformedURLException {
        YarnConfiguration conf = new YarnConfiguration();
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL());
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL());
        conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL());
        yarnClient.init(conf);
        yarnClient.start();
    }

After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task.

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@lanxing2
Copy link
Contributor Author

lanxing2 commented Jul 3, 2024

@SbloodyS SbloodyS added Waiting for user feedback Waiting for feedback from issue/PR author and removed Waiting for reply Waiting for reply labels Jul 4, 2024
@SbloodyS
Copy link
Member

SbloodyS commented Jul 4, 2024

You need to check yarn command is properly installed on the worker server.

@lanxing2
Copy link
Contributor Author

lanxing2 commented Jul 4, 2024

You need to check yarn command is properly installed on the worker server.

yarn command is installed appropriately on the worker server

By the way, do you have any idea about how dolphinscheduler tracking Yarn Application Status currently?

@SbloodyS
Copy link
Member

SbloodyS commented Jul 4, 2024

If you are sure that the yarn command is installed and works properly. In this case, you need to check whether the user who performs this task has the permission to execute sudo and whether the sudo command opens the permission to execute yarn.

@ruanwenjun
Copy link
Member

Can you find out if the kill cmd is correct?

@ruanwenjun
Copy link
Member

You need to check yarn command is properly installed on the worker server.

yarn command is installed appropriately on the worker server

By the way, do you have any idea about how dolphinscheduler tracking Yarn Application Status currently?

Right now, ds will not track the yarn application status, in most of case we don't need to track, since task is using sync mode, only if after cluster failover, we need to track the yarn task status, we can call yarn REST API to do this, but this is not implement in ds.

@lanxing2
Copy link
Contributor Author

Is there any plan to support sync/async mode for YarnTask? I achieve the code to track yarn application status by yarn REST API in my own code base.

@lanxing2
Copy link
Contributor Author

Can you find out if the kill cmd is correct?

kill cmd can kill process in my host.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Waiting for user feedback Waiting for feedback from issue/PR author
3 participants