| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.action.ssh; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.WorkflowAction.Status; |
| import org.apache.oozie.action.ActionExecutor; |
| import org.apache.oozie.action.ActionExecutorException; |
| import org.apache.oozie.service.CallbackService; |
| import org.apache.oozie.service.ConfigurationService; |
| import org.apache.oozie.servlet.CallbackServlet; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.util.BufferDrainer; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.PropertiesUtils; |
| import org.apache.oozie.util.XLog; |
| import org.apache.oozie.util.XmlUtils; |
| import org.jdom.Element; |
| import org.jdom.JDOMException; |
| import org.jdom.Namespace; |
| |
| /** |
| * Ssh action executor. <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper |
| * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper |
| * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul> |
| */ |
| public class SshActionExecutor extends ActionExecutor { |
| public static final String ACTION_TYPE = "ssh"; |
| |
| /** |
| * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user. |
| */ |
| public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host"; |
| |
| protected static final String SSH_COMMAND_OPTIONS = |
| "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 "; |
| |
| protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS; |
| protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS; |
| |
| public static final String ERR_SETUP_FAILED = "SETUP_FAILED"; |
| public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED"; |
| public static final String ERR_UNKNOWN_ERROR = "UNKNOWN_ERROR"; |
| public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT"; |
| public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST"; |
| public static final String ERR_FNF = "FNF"; |
| public static final String ERR_AUTH_FAILED = "AUTH_FAILED"; |
| public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM"; |
| public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH"; |
| public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN"; |
| |
| public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir"; |
| |
| public static final String HTTP_COMMAND = "oozie.action.ssh.http.command"; |
| |
| public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options"; |
| |
| public static final String CHECK_MAX_RETRIES = "oozie.action.ssh.check.retries.max"; |
| |
| public static final String CHECK_INITIAL_RETRY_WAIT_TIME = "oozie.action.ssh.check.initial.retry.wait.time"; |
| |
| private static final String EXT_STATUS_VAR = "#status"; |
| |
| private static final int SSH_CONNECT_ERROR_CODE = 255; |
| |
| private static int maxLen; |
| private static boolean allowSshUserAtHost; |
| |
| private final XLog LOG = XLog.getLog(getClass()) |
| ; |
| protected SshActionExecutor() { |
| super(ACTION_TYPE); |
| } |
| |
| /** |
| * Initialize Action. |
| */ |
| @Override |
| public void initActionType() { |
| super.initActionType(); |
| maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024); |
| allowSshUserAtHost = ConfigurationService.getBoolean(CONF_SSH_ALLOW_USER_AT_HOST); |
| registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001"); |
| registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002"); |
| initSshScripts(); |
| } |
| |
| /** |
| * Check ssh action status. |
| * |
| * @param context action execution context. |
| * @param action action object. |
| * @throws org.apache.oozie.action.ActionExecutorException in case if action cannot be executed |
| */ |
| @SuppressFBWarnings(value = {"COMMAND_INJECTION", "PATH_TRAVERSAL_OUT"}, |
| justification = "Tracker URI is specified in the WF action, and action dir path is from context") |
| @Override |
| public void check(Context context, WorkflowAction action) throws ActionExecutorException { |
| LOG.trace("check() start for action={0}", action.getId()); |
| Status status = getActionStatus(context, action); |
| boolean captureOutput; |
| try { |
| Element eConf = XmlUtils.parseXml(action.getConf()); |
| Namespace ns = eConf.getNamespace(); |
| captureOutput = eConf.getChild("capture-output", ns) != null; |
| } |
| catch (JDOMException ex) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED", |
| "unknown error", ex); |
| } |
| LOG.debug("Capture Output: {0}", captureOutput); |
| if (status == Status.OK) { |
| if (captureOutput) { |
| String outFile = getRemoteFileName(context, action, "stdout", false, true); |
| String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile; |
| LOG.debug("Ssh command [{0}]", dataCommand); |
| try { |
| final StringBuffer outBuffer = getActionOutputMessage(dataCommand); |
| context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(outBuffer.toString())); |
| LOG.trace("Execution data set. status={0}, properties={1}", status, |
| PropertiesUtils.stringToProperties(outBuffer.toString())); |
| } |
| catch (Exception ex) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR", |
| "unknown error", ex); |
| } |
| } |
| else { |
| LOG.trace("Execution data set to null. status={0}", status); |
| context.setExecutionData(status.toString(), null); |
| } |
| } |
| else { |
| if (status == Status.ERROR) { |
| LOG.warn("Execution data set to null in ERROR"); |
| context.setExecutionData(status.toString(), null); |
| String actionErrorMsg = getActionErrorMessage(context, action); |
| LOG.warn("{0}: Script failed on remote host with [{1}]", ErrorCode.E1111, actionErrorMsg); |
| context.setErrorInfo(ErrorCode.E1111.toString(), actionErrorMsg); |
| } |
| else { |
| LOG.warn("Execution data not set"); |
| context.setExternalStatus(status.toString()); |
| } |
| } |
| LOG.trace("check() end for action={0}", action); |
| } |
| |
| private StringBuffer getActionOutputMessage(String dataCommand) throws IOException, ActionExecutorException { |
| final Process process = Runtime.getRuntime().exec(dataCommand.split("\\s")); |
| boolean overflow = false; |
| final BufferDrainer bufferDrainer = new BufferDrainer(process, maxLen); |
| bufferDrainer.drainBuffers(); |
| final StringBuffer outBuffer = bufferDrainer.getInputBuffer(); |
| final StringBuffer errBuffer = bufferDrainer.getErrorBuffer(); |
| LOG.debug("outBuffer={0}", outBuffer); |
| LOG.debug("errBuffer={0}", errBuffer); |
| if (outBuffer.length() > maxLen) { |
| overflow = true; |
| } |
| if (overflow) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, |
| "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error"); |
| } |
| return outBuffer; |
| } |
| |
| private String getActionErrorMessage(Context context, WorkflowAction action) throws ActionExecutorException { |
| String outFile = getRemoteFileName(context, action, "error", false, true); |
| String errorMsgCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile; |
| LOG.debug("Get error message command: [{0}]", errorMsgCmd); |
| String errorMessage; |
| try { |
| final StringBuffer outBuffer = getActionOutputMessage(errorMsgCmd); |
| errorMessage = outBuffer.toString().replaceAll("\n", ""); |
| } catch (Exception ex) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR", |
| "unknown error", ex); |
| } |
| return errorMessage; |
| } |
| |
| /** |
| * Kill ssh action. |
| * |
| * @param context action execution context. |
| * @param action object. |
| * @throws org.apache.oozie.action.ActionExecutorException in case if action cannot be executed |
| */ |
| @Override |
| public void kill(Context context, WorkflowAction action) throws ActionExecutorException { |
| LOG.info("Killing action"); |
| String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId(); |
| int returnValue = getReturnValue(command); |
| if (returnValue != 0) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format( |
| "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri())); |
| } |
| context.setEndData(WorkflowAction.Status.KILLED, "ERROR"); |
| } |
| |
| /** |
| * Start the ssh action execution. |
| * |
| * @param context action execution context. |
| * @param action action object. |
| * @throws org.apache.oozie.action.ActionExecutorException in case if action cannot be executed |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public void start(final Context context, final WorkflowAction action) throws ActionExecutorException { |
| LOG.info("Starting action"); |
| String confStr = action.getConf(); |
| Element conf; |
| try { |
| conf = XmlUtils.parseXml(confStr); |
| } |
| catch (Exception ex) { |
| throw convertException(ex); |
| } |
| Namespace nameSpace = conf.getNamespace(); |
| Element hostElement = conf.getChild("host", nameSpace); |
| String hostString = hostElement.getValue().trim(); |
| hostString = prepareUserHost(hostString, context); |
| final String host = hostString; |
| final String dirLocation = execute(new Callable<String>() { |
| public String call() throws Exception { |
| return setupRemote(host, context, action); |
| } |
| }); |
| |
| String runningPid = execute(new Callable<String>() { |
| public String call() throws Exception { |
| return checkIfRunning(host, context, action); |
| } |
| }); |
| String pid = ""; |
| |
| LOG.trace("runningPid={0}", runningPid); |
| |
| if (runningPid == null) { |
| final Element commandElement = conf.getChild("command", nameSpace); |
| final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null; |
| |
| boolean preserve = false; |
| if (commandElement != null) { |
| String[] args = null; |
| // Will either have <args>, <arg>, or neither (but not both) |
| List<Element> argsList = conf.getChildren("args", nameSpace); |
| // Arguments in an <args> are "flattened" (spaces are delimiters) |
| if (argsList != null && argsList.size() > 0) { |
| StringBuilder argsString = new StringBuilder(""); |
| for (Element argsElement : argsList) { |
| argsString = argsString.append(argsElement.getValue()).append(" "); |
| } |
| args = new String[]{argsString.toString()}; |
| } |
| else { |
| // Arguments in an <arg> are preserved, even with spaces |
| argsList = conf.getChildren("arg", nameSpace); |
| if (argsList != null && argsList.size() > 0) { |
| preserve = true; |
| args = new String[argsList.size()]; |
| for (int i = 0; i < argsList.size(); i++) { |
| Element argsElement = argsList.get(i); |
| args[i] = argsElement.getValue(); |
| // Even though we're keeping the args as an array, if they contain a space we still have to either quote |
| // them or escape their space (because the scripts will split them up otherwise) |
| if (args[i].contains(" ") && |
| !(args[i].startsWith("\"") && args[i].endsWith("\"") || |
| args[i].startsWith("'") && args[i].endsWith("'"))) { |
| args[i] = StringUtils.escapeString(args[i], '\\', ' '); |
| } |
| } |
| } |
| } |
| final String[] argsF = args; |
| final String recoveryId = context.getRecoveryId(); |
| final boolean preserveF = preserve; |
| pid = execute(new Callable<String>() { |
| |
| @Override |
| public String call() throws Exception { |
| return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId, |
| preserveF); |
| } |
| }); |
| } |
| context.setStartData(pid, host, host); |
| } |
| else { |
| pid = runningPid; |
| context.setStartData(pid, host, host); |
| check(context, action); |
| } |
| } |
| |
| private String checkIfRunning(String host, final Context context, final WorkflowAction action) { |
| String outFile = getRemoteFileName(context, action, "pid", false, false); |
| String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile; |
| try { |
| final Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); |
| final BufferDrainer bufferDrainer = new BufferDrainer(process, maxLen); |
| bufferDrainer.drainBuffers(); |
| final StringBuffer buffer = bufferDrainer.getInputBuffer(); |
| String pid = getFirstLine(buffer); |
| if (Long.valueOf(pid) > 0) { |
| return pid; |
| } |
| else { |
| return null; |
| } |
| } |
| catch (Exception e) { |
| return null; |
| } |
| } |
| |
| /** |
| * Get remote host working location. |
| * |
| * @param context action execution context |
| * @param action Action |
| * @param fileExtension Extension to be added to file name |
| * @param dirOnly Get the Directory only |
| * @param useExtId Flag to use external ID in the path |
| * @return remote host file name/Directory. |
| */ |
| public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly, |
| boolean useExtId) { |
| String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/"; |
| if (dirOnly) { |
| return path; |
| } |
| if (useExtId) { |
| path = path + action.getExternalId() + "."; |
| } |
| path = path + context.getRecoveryId() + "." + fileExtension; |
| return path; |
| } |
| |
| /** |
| * Utility method to execute command. |
| * |
| * @param command Command to execute as String. |
| * @return exit status of the execution. |
| * @throws IOException if processSettings exits with status nonzero. |
| * @throws InterruptedException if processSettings does not run properly. |
| */ |
| public int executeCommand(String command) throws IOException, InterruptedException { |
| Runtime runtime = Runtime.getRuntime(); |
| Process p = runtime.exec(command.split("\\s")); |
| |
| final BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen); |
| final int exitValue = bufferDrainer.drainBuffers(); |
| final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer(); |
| |
| if (exitValue != 0) { |
| String error = getTruncatedString(errorBuffer); |
| throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: " |
| + error); |
| } |
| return exitValue; |
| } |
| |
| /** |
| * Do ssh action execution setup on remote host. |
| * |
| * @param host host name. |
| * @param context action execution context. |
| * @param action action object. |
| * @return remote host working directory. |
| * @throws IOException thrown if failed to setup. |
| * @throws InterruptedException thrown if any interruption happens. |
| */ |
| protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException { |
| LOG.info("Attempting to copy ssh base scripts to remote host [{0}]", host); |
| String localDirLocation = Services.get().getRuntimeDir() + "/ssh"; |
| if (localDirLocation.endsWith("/")) { |
| localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1); |
| } |
| File file = new File(localDirLocation + "/ssh-base.sh"); |
| if (!file.exists()) { |
| throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); |
| } |
| file = new File(localDirLocation + "/ssh-wrapper.sh"); |
| if (!file.exists()) { |
| throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); |
| } |
| String remoteDirLocation = getRemoteFileName(context, action, null, true, true); |
| String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString(); |
| executeCommand(command); |
| command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation, |
| localDirLocation, host, remoteDirLocation); |
| executeCommand(command); |
| command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host, |
| remoteDirLocation, remoteDirLocation); |
| executeCommand(command); |
| return remoteDirLocation; |
| } |
| |
| /** |
| * Execute the ssh command. |
| * |
| * @param host hostname. |
| * @param dirLocation location of the base and wrapper scripts. |
| * @param cmnd command to be executed. |
| * @param args command arguments. |
| * @param ignoreOutput ignore output option. |
| * @param action action object. |
| * @param recoveryId action id + run number to enable recovery in rerun |
| * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments |
| * @return processSettings id of the running command. |
| * @throws IOException thrown if failed to run the command. |
| * @throws InterruptedException thrown if any interruption happens. |
| */ |
| protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput, |
| WorkflowAction action, String recoveryId, boolean preserveArgs) |
| throws IOException, InterruptedException { |
| Runtime runtime = Runtime.getRuntime(); |
| String callbackPost = ignoreOutput ? "_" : ConfigurationService.get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%"); |
| String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS"; |
| // TODO check |
| String callBackUrl = Services.get().get(CallbackService.class) |
| .createCallBackUrl(action.getId(), EXT_STATUS_VAR); |
| String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation, |
| preserveArgsS, ConfigurationService.get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd); |
| String[] commandArray = command.split("\\s"); |
| String[] finalCommand; |
| if (args == null) { |
| finalCommand = commandArray; |
| } |
| else { |
| finalCommand = new String[commandArray.length + args.length]; |
| System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length); |
| System.arraycopy(args, 0, finalCommand, commandArray.length, args.length); |
| } |
| |
| LOG.trace("Executing SSH command [finalCommand={0}]", Arrays.toString(finalCommand)); |
| final Process p = runtime.exec(finalCommand); |
| |
| BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen); |
| final int exitValue = bufferDrainer.drainBuffers(); |
| final StringBuffer inputBuffer = bufferDrainer.getInputBuffer(); |
| final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer(); |
| final String pid = getFirstLine(inputBuffer); |
| if (exitValue != 0) { |
| String error = getTruncatedString(errorBuffer); |
| throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: " |
| + error); |
| } |
| |
| LOG.trace("After execution pid={0}", pid); |
| |
| return pid; |
| } |
| |
| /** |
| * End action execution. |
| * |
| * @param context action execution context. |
| * @param action action object. |
| * @throws ActionExecutorException thrown if action end execution fails. |
| */ |
| public void end(final Context context, final WorkflowAction action) throws ActionExecutorException { |
| if (action.getExternalStatus().equals("OK")) { |
| context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString()); |
| } |
| else { |
| context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString()); |
| } |
| boolean deleteTmpDir = ConfigurationService.getBoolean(DELETE_TMP_DIR); |
| if (deleteTmpDir) { |
| String tmpDir = getRemoteFileName(context, action, null, true, false); |
| String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir; |
| int retVal = getReturnValue(removeTmpDirCmd); |
| if (retVal != 0) { |
| XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir); |
| } |
| } |
| LOG.info("Action ended with external status [{0}]", action.getExternalStatus()); |
| } |
| |
| /** |
| * Get the return value of a processSettings. |
| * |
| * @param command command to be executed. |
| * @return zero if execution is successful and any non zero value for failure. |
| * @throws ActionExecutorException |
| */ |
| private int getReturnValue(String command) throws ActionExecutorException { |
| LOG.trace("Getting return value for command={0}", command); |
| |
| int returnValue; |
| Process ps = null; |
| try { |
| ps = Runtime.getRuntime().exec(command.split("\\s")); |
| final BufferDrainer bufferDrainer = new BufferDrainer(ps, 0); |
| returnValue = bufferDrainer.drainBuffers(); |
| } |
| catch (IOException e) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format( |
| "Not able to perform operation {0}", command), e); |
| } |
| finally { |
| ps.destroy(); |
| } |
| |
| LOG.trace("returnValue={0}", returnValue); |
| |
| return returnValue; |
| } |
| |
| /** |
| * Copy the ssh base and wrapper scripts to the local directory. |
| */ |
| @SuppressFBWarnings(value ="PATH_TRAVERSAL_OUT", justification = "Path is created runtime") |
| private void initSshScripts() { |
| String dirLocation = Services.get().getRuntimeDir() + "/ssh"; |
| File path = new File(dirLocation); |
| path.mkdirs(); |
| if (!path.exists()) { |
| throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation)); |
| } |
| try { |
| IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new OutputStreamWriter( |
| new FileOutputStream(dirLocation + "/ssh-base.sh"), StandardCharsets.UTF_8)); |
| IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new OutputStreamWriter( |
| new FileOutputStream(dirLocation + "/ssh-wrapper.sh"), StandardCharsets.UTF_8)); |
| } |
| catch (IOException ie) { |
| throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} " |
| + "for SshActionHandler", dirLocation)); |
| } |
| } |
| |
| /** |
| * Get action status. |
| * |
| * @param context executor context |
| * @param action action object. |
| * @return status of the action(RUNNING/OK/ERROR). |
| * @throws ActionExecutorException thrown if there is any error in getting status. |
| */ |
| protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException { |
| String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId(); |
| Status aStatus; |
| int returnValue = getReturnValue(command); |
| if (returnValue == SSH_CONNECT_ERROR_CODE) { |
| int maxRetryCount = ConfigurationService.getInt(CHECK_MAX_RETRIES, 3); |
| long waitTime = ConfigurationService.getLong(CHECK_INITIAL_RETRY_WAIT_TIME, 3000); |
| for (int retries = 1; retries <= maxRetryCount; retries++) { |
| waitTime = handleRetry(waitTime, retries); |
| returnValue = getReturnValue(command); |
| if (returnValue != SSH_CONNECT_ERROR_CODE) { |
| break; |
| } |
| } |
| if (returnValue == SSH_CONNECT_ERROR_CODE) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ERR_COULD_NOT_CONNECT, |
| "Failed to connect to host [" + action.getTrackerUri() + "] for ssh action status check."); |
| } |
| } |
| if (returnValue == 0) { |
| aStatus = Status.RUNNING; |
| } |
| else { |
| String outFile = getRemoteFileName(context, action, "error", false, true); |
| String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile; |
| int retVal = getReturnValue(checkErrorCmd); |
| if (retVal == 0) { |
| aStatus = Status.ERROR; |
| } |
| else { |
| aStatus = Status.OK; |
| } |
| } |
| return aStatus; |
| } |
| |
| private long handleRetry(long sleepBeforeRetryMs, final int retries) { |
| LOG.warn("failed to check ssh action status, sleeping {0} milliseconds before retry #{1}", sleepBeforeRetryMs, |
| retries); |
| try { |
| Thread.sleep(sleepBeforeRetryMs); |
| } catch (InterruptedException e) { |
| LOG.error("ssh action status check retry get interrupted during wait, caused by {0}", e.getMessage()); |
| } |
| sleepBeforeRetryMs *= 2; |
| return sleepBeforeRetryMs; |
| } |
| |
| /** |
| * Execute the callable. |
| * |
| * @param callable required callable. |
| * @throws ActionExecutorException thrown if there is any error in command execution. |
| */ |
| private <T> T execute(Callable<T> callable) throws ActionExecutorException { |
| XLog log = XLog.getLog(getClass()); |
| try { |
| return callable.call(); |
| } |
| catch (IOException ex) { |
| log.warn("Error while executing ssh EXECUTION"); |
| String errorMessage = ex.getMessage(); |
| if (null == errorMessage) { // Unknown IOException |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex |
| .getMessage(), ex); |
| } // Host Resolution Issues |
| else { |
| if (errorMessage.contains("Could not resolve hostname") || |
| errorMessage.contains("service not known")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex |
| .getMessage(), ex); |
| } // Connection Timeout. Host temporarily down. |
| else { |
| if (errorMessage.contains("timed out")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT, |
| ex.getMessage(), ex); |
| }// Local ssh-base or ssh-wrapper missing |
| else { |
| if (errorMessage.contains("Required Local file")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, |
| ex.getMessage(), ex); // local_FNF |
| }// Required oozie bash scripts missing, after the copy was |
| // successful |
| else { |
| if (errorMessage.contains("No such file or directory") |
| && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, |
| ex.getMessage(), ex); // remote |
| // FNF |
| } // Required application execution binary missing (either |
| // caught by ssh-wrapper |
| else { |
| if (errorMessage.contains("command not found")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex |
| .getMessage(), ex); // remote |
| // FNF |
| } // Permission denied while connecting |
| else { |
| if (errorMessage.contains("Permission denied")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, |
| ERR_AUTH_FAILED, ex.getMessage(), ex); |
| } // Permission denied while executing |
| else { |
| if (errorMessage.contains(": Permission denied")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, |
| ERR_NO_EXEC_PERM, ex.getMessage(), ex); |
| } |
| else { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, |
| ERR_UNKNOWN_ERROR, ex.getMessage(), ex); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } // Any other type of exception |
| catch (Exception ex) { |
| throw convertException(ex); |
| } |
| } |
| |
| /** |
| * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required. |
| * |
| * @param host the host string. |
| * @param context the execution context. |
| * @return the modified host string with a user parameter added on if required. |
| * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch |
| * between the user specified in the host and the oozie user. |
| */ |
| private String prepareUserHost(String host, Context context) throws ActionExecutorException { |
| String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME); |
| if (allowSshUserAtHost) { |
| if (!host.contains("@")) { |
| host = oozieUser + "@" + host; |
| } |
| } |
| else { |
| if (host.contains("@")) { |
| if (!host.toLowerCase().startsWith(oozieUser + "@")) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH, |
| XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", |
| oozieUser, host)); |
| } |
| } |
| else { |
| host = oozieUser + "@" + host; |
| } |
| } |
| |
| LOG.trace("User host is {0}", host); |
| |
| return host; |
| } |
| |
| @Override |
| public boolean isCompleted(String externalStatus) { |
| return true; |
| } |
| |
| /** |
| * Truncate the string to max length. |
| * |
| * @param strBuffer |
| * @return truncated string string |
| */ |
| private String getTruncatedString(StringBuffer strBuffer) { |
| if (strBuffer.length() <= maxLen) { |
| return strBuffer.toString(); |
| } |
| else { |
| return strBuffer.substring(0, maxLen); |
| } |
| } |
| |
| /** |
| * Returns the first line from a StringBuffer, recognized by the new line character \n. |
| * |
| * @param buffer The StringBuffer from which the first line is required. |
| * @return The first line of the buffer. |
| */ |
| private String getFirstLine(StringBuffer buffer) { |
| int newLineIndex = buffer.indexOf("\n"); |
| if (newLineIndex == -1) { |
| return buffer.toString(); |
| } |
| else { |
| return buffer.substring(0, newLineIndex); |
| } |
| } |
| } |