blob: 382f1778b929afc9c7221d0833ca81945583fb50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.python;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
/**
* python task
*/
public class PythonTask extends AbstractTaskExecutor {
/**
* python parameters
*/
private PythonParameters pythonParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
private TaskRequest taskRequest;
/**
* constructor
*
* @param taskRequest taskRequest
*/
public PythonTask(TaskRequest taskRequest) {
super(taskRequest);
this.taskRequest = taskRequest;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
logger);
}
@Override
public void init() {
logger.info("python task params {}", taskRequest.getTaskParams());
pythonParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), PythonParameters.class);
if (!pythonParameters.checkParameters()) {
throw new TaskException("python task params is not valid");
}
}
@Override
public String getPreScript() {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
try {
rawPythonScript = convertPythonScriptPlaceholders(rawPythonScript);
} catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
return rawPythonScript;
}
@Override
public void handle() throws Exception {
try {
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();
// generate the file path of this python script
String pythonScriptFile = buildPythonCommandFilePath();
// create this file
createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile);
String command = "python " + pythonScriptFile;
TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("run python task error", e);
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
@Override
public AbstractParameters getParameters() {
return pythonParameters;
}
/**
* convertPythonScriptPlaceholders
*
* @param rawScript rawScript
* @return String
* @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
*/
private static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
int len = "${setShareVar(${".length();
int scriptStart = 0;
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) {
int start = -1;
int end = rawScript.indexOf('}', scriptStart + len);
String prop = rawScript.substring(scriptStart + len, end);
start = rawScript.indexOf(',', end);
end = rawScript.indexOf(')', start);
String value = rawScript.substring(start + 1, end);
start = rawScript.indexOf('}', start) + 1;
end = rawScript.length();
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end);
scriptStart += replaceScript.length();
}
return rawScript;
}
/**
* create python command file if not exists
*
* @param pythonScript exec python script
* @param pythonScriptFile python script file
* @throws IOException io exception
*/
protected void createPythonCommandFileIfNotExists(String pythonScript, String pythonScriptFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
if (!Files.exists(Paths.get(pythonScriptFile))) {
logger.info("generate python script file:{}", pythonScriptFile);
StringBuilder sb = new StringBuilder();
sb.append("#-*- encoding=utf8 -*-\n");
sb.append("\n\n");
sb.append(pythonScript);
logger.info(sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(pythonScriptFile),
sb.toString(),
StandardCharsets.UTF_8);
}
}
/**
* build python command file path
*
* @return python command file path
*/
protected String buildPythonCommandFilePath() {
return String.format("%s/py_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
}
/**
* build python script content
*
* @return raw python script
* @throws Exception exception
*/
private String buildPythonScriptContent() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskRequest, pythonParameters);
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) {
paramsMap.putAll(taskRequest.getParamsMap());
}
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
return rawPythonScript;
}
}