blob: 0f88a29fd49c9d85c5c5e0adb60a4e4fb3aa6b10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.local.process;
import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
import org.apache.reef.util.OSUtils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A runnable class that encapsulates a process.
*/
public final class RunnableProcess implements Runnable {
private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
private static final long DESTROY_WAIT_TIME = 100;
/**
* Name of the file used for STDERR redirection.
*/
private final String standardErrorFileName;
/**
* Name of the file used for STDOUT redirection.
*/
private final String standardOutFileName;
/**
* Command to execute.
*/
private final List<String> command;
/**
* User supplied ID of this process.
*/
private final String id;
/**
* The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
*/
private final File folder;
/**
* The coarse-grained lock for state transition.
*/
private final Lock stateLock = new ReentrantLock();
private final Condition doneCond = stateLock.newCondition();
/**
* This will be informed of process start and stop.
*/
private final RunnableProcessObserver processObserver;
/**
* The process.
*/
private Process process;
/**
* The state of the process.
*/
private RunnableProcessState state = RunnableProcessState.INIT; // synchronized on stateLock
/**
* @param command the command to execute.
* @param id The ID of the process. This is used to name files and in the logs created by this process.
* @param folder The folder in which this will store its stdout and stderr output
* @param processObserver will be informed of process state changes.
* @param standardOutFileName The name of the file used for redirecting STDOUT
* @param standardErrorFileName The name of the file used for redirecting STDERR
*/
public RunnableProcess(
final List<String> command,
final String id,
final File folder,
final RunnableProcessObserver processObserver,
final String standardOutFileName,
final String standardErrorFileName) {
this.processObserver = processObserver;
this.command = Collections.unmodifiableList(expandEnvironmentVariables(command));
this.id = id;
this.folder = folder;
assert this.folder.isDirectory();
if (!this.folder.exists() && !this.folder.mkdirs()) {
LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath());
}
this.standardOutFileName = standardOutFileName;
this.standardErrorFileName = standardErrorFileName;
LOG.log(Level.FINEST, "RunnableProcess ready.");
}
private static final Pattern ENV_REGEX = Pattern.compile("\\{\\{(\\w+)}}");
/**
* Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables.
* @param command An input string with {{ENV_VAR}} placeholders
* to be replaced with the values of the corresponding environment variables.
* Replace unknown/unset variables with an empty string.
* @return A new string with all the placeholders expanded.
*/
public static String expandEnvironmentVariables(final String command) {
final Matcher match = ENV_REGEX.matcher(command);
final StringBuilder res = new StringBuilder(command.length());
int i = 0;
while (match.find()) {
final String var = System.getenv(match.group(1));
res.append(command.substring(i, match.start())).append(var == null ? "" : var);
i = match.end();
}
return res.append(command.substring(i, command.length())).toString();
}
/**
* Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables.
* @param command An input list of strings with {{ENV_VAR}} placeholders
* to be replaced with the values of the corresponding environment variables.
* Replace unknown/unset variables with an empty string.
* @return A new list of strings with all the placeholders expanded.
*/
public static List<String> expandEnvironmentVariables(final List<String> command) {
final ArrayList<String> res = new ArrayList<>(command.size());
for (final String cmd : command) {
res.add(expandEnvironmentVariables(cmd));
}
return res;
}
/**
* Runs the configured process.
* @throws IllegalStateException if the process is already running or has been running before.
*/
@Override
public void run() {
this.stateLock.lock();
try {
if (this.state != RunnableProcessState.INIT) {
throw new IllegalStateException("The RunnableProcess can't be reused");
}
// Setup the stdout and stderr destinations.
final File errFile = new File(folder, standardErrorFileName);
final File outFile = new File(folder, standardOutFileName);
// Launch the process
try {
LOG.log(Level.FINEST,
"Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
new Object[] {this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
this.process = new ProcessBuilder()
.command(this.command)
.directory(this.folder)
.redirectError(errFile)
.redirectOutput(outFile)
.start();
this.setState(RunnableProcessState.RUNNING);
this.processObserver.onProcessStarted(this.id);
} catch (final IOException ex) {
LOG.log(Level.SEVERE, "Unable to spawn process " + this.id + " with command " + this.command, ex);
}
} finally {
this.stateLock.unlock();
}
try {
// Wait for its completion
LOG.log(Level.FINER, "Wait for process completion: {0}", this.id);
final int returnValue = this.process.waitFor();
this.processObserver.onProcessExit(this.id, returnValue);
this.stateLock.lock();
try {
this.setState(RunnableProcessState.ENDED);
this.doneCond.signalAll();
} finally {
this.stateLock.unlock();
}
LOG.log(Level.FINER, "Process \"{0}\" returned {1}", new Object[] {this.id, returnValue});
} catch (final InterruptedException ex) {
LOG.log(Level.SEVERE,
"Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
new Object[] {this.id, ex});
}
}
/**
* Cancels the running process if it is running.
*/
public void cancel() {
this.stateLock.lock();
try {
if (this.state == RunnableProcessState.RUNNING) {
this.process.destroy();
if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) {
LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME);
}
}
if (this.state == RunnableProcessState.RUNNING) {
LOG.log(Level.WARNING, "The child process survived Process.destroy()");
if (OSUtils.isUnix() || OSUtils.isWindows()) {
LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
try {
final long pid = readPID();
OSUtils.kill(pid);
} catch (final IOException | InterruptedException | NumberFormatException e) {
LOG.log(Level.SEVERE, "Unable to kill the process.", e);
}
}
}
} catch (final InterruptedException ex) {
LOG.log(Level.SEVERE,
"Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
new Object[] {this.id, ex});
} finally {
this.stateLock.unlock();
}
}
/**
* @return the PID stored in the PID file.
* @throws IOException if the file can't be read.
*/
private long readPID() throws IOException {
final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
try (BufferedReader r = new BufferedReader(
new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) {
return Long.parseLong(r.readLine());
}
}
/**
* @return the ID of the process.
*/
public String getId() {
return this.id;
}
/**
* @return the command given to the process.
*/
public List<String> getCommand() {
return this.command;
}
/**
* Sets a new state for the process.
* @param newState a new process state to transition to.
* @throws IllegalStateException if the new state is illegal.
*/
private void setState(final RunnableProcessState newState) {
if (!this.state.isLegal(newState)) {
throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
}
this.state = newState;
}
}