[REEF-1639] Refactor RunnableProcess and move State logic into a separate enum
JIRA:
[REEF-1639](https://issues.apache.org/jira/browse/REEF-1639)
Pull request:
This closes #1154
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
index 12b4488..d545b3f 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
@@ -23,7 +23,7 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -32,7 +32,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-
/**
* A runnable class that encapsulates a process.
*/
@@ -46,6 +45,7 @@
* Name of the file used for STDERR redirection.
*/
private final String standardErrorFileName;
+
/**
* Name of the file used for STDOUT redirection.
*/
@@ -55,102 +55,83 @@
* Command to execute.
*/
private final List<String> command;
+
/**
* User supplied ID of this process.
*/
private final String id;
+
/**
* The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
*/
private final File folder;
+
/**
* The coarse-grained lock for state transition.
*/
private final Lock stateLock = new ReentrantLock();
+
private final Condition doneCond = stateLock.newCondition();
+
/**
* This will be informed of process start and stop.
*/
private final RunnableProcessObserver processObserver;
+
/**
* The process.
*/
private Process process;
+
/**
* The state of the process.
*/
- private State state = State.INIT; // synchronized on stateLock
+ private RunnableProcessState state = RunnableProcessState.INIT; // synchronized on stateLock
/**
- * @param command the command to execute.
- * @param id The ID of the process. This is used to name files and in the logs created
- * by this process.
- * @param folder The folder in which this will store its stdout and stderr output
- * @param processObserver will be informed of process state changes.
- * @param standardOutFileName The name of the file used for redirecting STDOUT
+ * @param command the command to execute.
+ * @param id The ID of the process. This is used to name files and in the logs created by this process.
+ * @param folder The folder in which this will store its stdout and stderr output
+ * @param processObserver will be informed of process state changes.
+ * @param standardOutFileName The name of the file used for redirecting STDOUT
* @param standardErrorFileName The name of the file used for redirecting STDERR
*/
- public RunnableProcess(final List<String> command,
- final String id,
- final File folder,
- final RunnableProcessObserver processObserver,
- final String standardOutFileName,
- final String standardErrorFileName) {
+ public RunnableProcess(
+ final List<String> command,
+ final String id,
+ final File folder,
+ final RunnableProcessObserver processObserver,
+ final String standardOutFileName,
+ final String standardErrorFileName) {
+
this.processObserver = processObserver;
- this.command = new ArrayList<>(command);
+ this.command = Collections.unmodifiableList(command);
this.id = id;
this.folder = folder;
+
assert this.folder.isDirectory();
if (!this.folder.exists() && !this.folder.mkdirs()) {
LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath());
}
+
this.standardOutFileName = standardOutFileName;
this.standardErrorFileName = standardErrorFileName;
+
LOG.log(Level.FINEST, "RunnableProcess ready.");
}
/**
- * Checks whether a transition from State 'from' to state 'to' is legal.
- *
- * @param from
- * @param to
- * @return true, if the state transition is legal. False otherwise.
- */
- private static boolean isLegal(final State from, final State to) {
- switch (from) {
- case INIT:
- switch (to) {
- case INIT:
- case RUNNING:
- case ENDED:
- return true;
- default:
- return false;
- }
- case RUNNING:
- switch (to) {
- case ENDED:
- return true;
- default:
- return false;
- }
- case ENDED:
- return false;
- default:
- return false;
- }
- }
-
- /**
* Runs the configured process.
- *
- * @throws java.lang.IllegalStateException if the process is already running or has been running before.
+ * @throws IllegalStateException if the process is already running or has been running before.
*/
@Override
public void run() {
+
this.stateLock.lock();
+
try {
- if (this.getState() != State.INIT) {
+
+ if (this.state != RunnableProcessState.INIT) {
throw new IllegalStateException("The RunnableProcess can't be reused");
}
@@ -160,57 +141,72 @@
// Launch the process
try {
- LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
- new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
+
+ LOG.log(Level.FINEST,
+ "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
+ new Object[] {this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
+
this.process = new ProcessBuilder()
.command(this.command)
.directory(this.folder)
.redirectError(errFile)
.redirectOutput(outFile)
.start();
- this.setState(State.RUNNING);
+
+ this.setState(RunnableProcessState.RUNNING);
this.processObserver.onProcessStarted(this.id);
+
} catch (final IOException ex) {
- LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
- new Object[]{this.id, this.command, ex});
+ LOG.log(Level.SEVERE,
+ "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
+ new Object[] {this.id, this.command, ex});
}
+
} finally {
this.stateLock.unlock();
}
try {
+
// Wait for its completion
- final int returnValue = process.waitFor();
+ LOG.log(Level.FINER, "Wait for process completion: {0}", this.id);
+ final int returnValue = this.process.waitFor();
this.processObserver.onProcessExit(this.id, returnValue);
+
this.stateLock.lock();
try {
- this.setState(State.ENDED);
+ this.setState(RunnableProcessState.ENDED);
this.doneCond.signalAll();
} finally {
this.stateLock.unlock();
}
- LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue});
+
+ LOG.log(Level.FINER, "Process \"{0}\" returned {1}", new Object[] {this.id, returnValue});
+
} catch (final InterruptedException ex) {
- LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
- new Object[]{this.id, ex});
+ LOG.log(Level.SEVERE,
+ "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
+ new Object[] {this.id, ex});
}
}
-
/**
* Cancels the running process if it is running.
*/
public void cancel() {
+
this.stateLock.lock();
+
try {
- if (this.processIsRunning()) {
+
+ if (this.state == RunnableProcessState.RUNNING) {
this.process.destroy();
if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) {
LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME);
}
}
- if (this.processIsRunning()) {
+ if (this.state == RunnableProcessState.RUNNING) {
LOG.log(Level.WARNING, "The child process survived Process.destroy()");
if (OSUtils.isUnix() || OSUtils.isWindows()) {
LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
@@ -224,8 +220,9 @@
}
} catch (final InterruptedException ex) {
- LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
- new Object[]{this.id, ex});
+ LOG.log(Level.SEVERE,
+ "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
+ new Object[] {this.id, ex});
} finally {
this.stateLock.unlock();
}
@@ -237,27 +234,13 @@
*/
private long readPID() throws IOException {
final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
- try (final BufferedReader r =
- new BufferedReader(new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) {
+ try (final BufferedReader r = new BufferedReader(
+ new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) {
return Long.parseLong(r.readLine());
}
}
/**
- * @return a boolean that indicates if the process is running.
- */
- private boolean processIsRunning() {
- return this.getState() == State.RUNNING;
- }
-
- /**
- * @return the current State of the process.
- */
- private State getState() {
- return this.state;
- }
-
- /**
* @return the ID of the process.
*/
public String getId() {
@@ -273,26 +256,13 @@
/**
* Sets a new state for the process.
- *
- * @param newState
- * @throws java.lang.IllegalStateException if the new state is illegal.
+ * @param newState a new process state to transition to.
+ * @throws IllegalStateException if the new state is illegal.
*/
- private void setState(final State newState) {
- if (!isLegal(this.state, newState)) {
+ private void setState(final RunnableProcessState newState) {
+ if (!this.state.isLegal(newState)) {
throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
}
this.state = newState;
}
-
- /**
- * The possible states of a process: INIT, RUNNING, ENDED.
- */
- private enum State {
- // After initialization
- INIT,
- // The process is running
- RUNNING,
- // The process ended
- ENDED
- }
}
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java
new file mode 100644
index 0000000..ed239fc
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+/**
+ * The possible states of a process: INIT, RUNNING, ENDED.
+ */
+enum RunnableProcessState {
+
+ /** After initialization. */
+ INIT,
+
+ /** The process is running. */
+ RUNNING,
+
+ /** The process ended. */
+ ENDED;
+
+ /**
+ * Check whether a transition from current state to the given one is legal.
+ * @param toState destination state.
+ * @return True if the state transition is legal, false otherwise.
+ */
+ public boolean isLegal(final RunnableProcessState toState) {
+
+ switch (this) {
+
+ case INIT:
+ switch (toState) {
+ case RUNNING:
+ case ENDED:
+ return true;
+ default:
+ return false;
+ }
+
+ case RUNNING:
+ switch (toState) {
+ case ENDED:
+ return true;
+ default:
+ return false;
+ }
+
+ case ENDED:
+ return false;
+
+ default:
+ return false;
+ }
+ }
+}