[REEF-1573] Split REEFLauncher into executable part and the Clock-running environment.
This is work towards [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) *"REEF as a library"* effort.
Summary of changes:
* Split REEFLauncher into launcher and REEFEnvironment.
* Improve error handling in REEFLauncher and around.
* Fixes in logging and error reporting.
* Add unit test for local environment driver execution.
* Minor cosmetic and style fixes.
* Add comments to explain checkstyle exceptions
* Add several TODOs and related JIRAs for future work
JIRA: [REEF-1573](https://issues.apache.org/jira/browse/REEF-1573)
This closes #1133
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
new file mode 100644
index 0000000..2809d44
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.time.Clock;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entry point into any REEF process (Driver and Evaluator).
+ * It is mostly reading from the command line to instantiate
+ * the runtime clock and calling .run() on it.
+ */
+public final class REEFEnvironment implements Runnable, AutoCloseable {
+
+ /**
+ * Parameter to enable Wake network profiling. By default profiling is disabled.
+ * TODO[REEF-1629] Move that parameter and related code into Wake package.
+ */
+ @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
+ private static final class ProfilingEnabled implements Name<Boolean> { }
+
+ private static final Logger LOG = Logger.getLogger(REEFEnvironment.class.getName());
+
+ private static final Tang TANG = Tang.Factory.getTang();
+
+ /** Main event loop of current REEF component (Driver or Evaluator). */
+ private final Clock clock;
+
+ /** Error handler that processes all uncaught REEF exceptions. */
+ private final REEFErrorHandler errorHandler;
+
+ /**
+ * Create a new REEF environment.
+ * @param configurations REEF component (Driver or Evaluator) configuration.
+ * If multiple configurations are provided, they will be merged before use.
+ * Main part of the configuration is usually read from config file by REEFLauncher.
+ * @throws InjectionException Thrown on configuration error.
+ */
+ @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+ public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException {
+
+ final Configuration config = Configurations.merge(configurations);
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ // TODO[REEF-1633] Obtain default serializer from Tang, or use Tang to pretty print.
+ LOG.log(Level.FINEST, "Configuration:\n--\n{0}\n--",
+ new AvroConfigurationSerializer().toString(config, true));
+ }
+
+ final Injector injector = TANG.newInjector(config);
+
+ if (injector.getNamedInstance(ProfilingEnabled.class)) {
+ final WakeProfiler profiler = new WakeProfiler();
+ ProfilingStopHandler.setProfiler(profiler);
+ injector.bindAspect(profiler);
+ }
+
+ injector.getInstance(REEFVersion.class).logVersion();
+
+ final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class);
+
+ try {
+
+ final Clock clock = injector.getInstance(Clock.class);
+ return new REEFEnvironment(clock, errorHandler);
+
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Error while instantiating the clock", ex);
+ try {
+ errorHandler.onNext(ex);
+ } catch (final Throwable exHandling) {
+ LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling);
+ }
+ throw ex;
+ }
+ }
+
+ /**
+ * Use .fromConfiguration() method to create new REEF environment.
+ * @param clock main event loop.
+ * @param errorHandler error handler.
+ */
+ private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) {
+ this.clock = clock;
+ this.errorHandler = errorHandler;
+ }
+
+ /**
+ * Close and cleanup the environment.
+ * Invoke .close() on all closeable members (clock and error handler).
+ */
+ @Override
+ @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+ public void close() {
+
+ LOG.log(Level.FINER, "Closing REEF Environment - start");
+
+ try {
+ this.clock.close();
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Error while closing the clock", ex);
+ try {
+ this.errorHandler.onNext(ex);
+ } catch (final Throwable exHandling) {
+ LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling);
+ }
+ } finally {
+ try {
+ this.errorHandler.close();
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Error while closing the error handler", ex);
+ }
+ }
+
+ LOG.log(Level.FINER, "Closing REEF Environment - end");
+ }
+
+ /**
+ * Launch REEF component (Driver or Evaluator).
+ * It is usually called from the static .run() method.
+ */
+ @Override
+ @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+ public void run() {
+
+ LOG.log(Level.FINE, "REEF started with user name [{0}]", System.getProperty("user.name"));
+ LOG.log(Level.FINE, "REEF started. Assertions are {0} in this process.",
+ EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+
+ try {
+
+ LOG.log(Level.FINEST, "Clock: start");
+ this.clock.run();
+ LOG.log(Level.FINEST, "Clock: exit normally");
+
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex);
+ this.errorHandler.onNext(ex);
+ throw ex;
+ }
+ }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 3ad5fe4..74de099 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -19,24 +19,18 @@
package org.apache.reef.runtime.common;
import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
-import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
import org.apache.reef.runtime.common.launch.REEFErrorHandler;
import org.apache.reef.runtime.common.launch.REEFMessageCodec;
import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.util.REEFVersion;
import org.apache.reef.util.ThreadLogger;
import org.apache.reef.util.logging.LoggingSetup;
-import org.apache.reef.wake.profiler.WakeProfiler;
import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.time.Clock;
@@ -54,12 +48,6 @@
*/
public final class REEFLauncher {
- /**
- * Parameter to enable profiling. By default profiling is disabled.
- */
- @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
- private static final class ProfilingEnabled implements Name<Boolean> { }
-
private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
private static final Tang TANG = Tang.Factory.getTang();
@@ -76,42 +64,28 @@
LoggingSetup.setupCommonsLogging();
}
- /** Config parameter to turn on network IO profiling in Wake. */
- private final boolean isWakeProfilingEnabled;
-
- /** REEF version - we need it simply to write it to the log. */
- private final REEFVersion reefVersion;
-
/**
* Main configuration object of the REEF component we are launching here.
- * The launcher uses that configuration to instantiate the Clock object,
+ * REEFEnvironment uses that configuration to instantiate the Clock object,
* and then call .run() on it.
*/
- private final Configuration clockConfig;
+ private final Configuration envConfig;
/**
* REEFLauncher is instantiated in the main() method below using
* Tang configuration file provided as a command line argument.
* @param configurationPath Path to the serialized Tang configuration file.
* (The file must be in the local file system).
- * @param enableProfiling If true, turn on profiling in Wake.
* @param configurationSerializer Serializer used to read the configuration file.
* We currently use Avro to serialize Tang configs.
- * @param reefVersion An injectable object that contains REEF version.
*/
@Inject
private REEFLauncher(
@Parameter(ClockConfigurationPath.class) final String configurationPath,
- @Parameter(ProfilingEnabled.class) final boolean enableProfiling,
- final ConfigurationSerializer configurationSerializer,
- final REEFVersion reefVersion) {
+ final ConfigurationSerializer configurationSerializer) {
- this.isWakeProfilingEnabled = enableProfiling;
- this.reefVersion = reefVersion;
-
- this.clockConfig = Configurations.merge(
- readConfigurationFromDisk(configurationPath, configurationSerializer),
- LAUNCHER_STATIC_CONFIG);
+ this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG,
+ readConfigurationFromDisk(configurationPath, configurationSerializer));
}
/**
@@ -130,10 +104,10 @@
return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class);
- } catch (final BindException e) {
- throw fatal("Error in parsing the command line", e);
- } catch (final InjectionException e) {
- throw fatal("Unable to run REEFLauncher.", e);
+ } catch (final BindException ex) {
+ throw fatal("Error in parsing the command line", ex);
+ } catch (final InjectionException ex) {
+ throw fatal("Unable to instantiate REEFLauncher.", ex);
}
}
@@ -169,11 +143,7 @@
try {
final Configuration config = serializer.fromFile(evaluatorConfigFile);
-
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "Configuration file {0} loaded:\n--\n{1}\n--",
- new Object[] {configPath, new AvroConfigurationSerializer().toString(config, true)});
- }
+ LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
return config;
@@ -184,13 +154,14 @@
/**
* Launches a REEF client process (Driver or Evaluator).
- * @param args Command-line arguments -
- * must be a single element containing local path to the configuration file.
+ * @param args Command-line arguments.
+ * Must be a single element containing local path to the configuration file.
*/
@SuppressWarnings("checkstyle:illegalcatch")
public static void main(final String[] args) {
LOG.log(Level.INFO, "Entering REEFLauncher.main().");
+
LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
@@ -204,21 +175,12 @@
final REEFLauncher launcher = getREEFLauncher(args[0]);
- Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig));
- launcher.reefVersion.logVersion(); // Write REEF version to the log.
+ Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
- try (final Clock clock = launcher.getClockFromConfig()) {
-
- LOG.log(Level.FINE, "Clock: start");
- clock.run();
- LOG.log(Level.FINE, "Clock: exit normally");
-
+ try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) {
+ reef.run();
} catch (final Throwable ex) {
- try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) {
- throw fatal(errorHandler, "Unable to instantiate the clock", ex);
- } catch (final InjectionException e) {
- throw fatal("Unable to instantiate the clock and the ErrorHandler", e);
- }
+ throw fatal("Unable to configure and start REEFEnvironment.", ex);
}
LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
@@ -235,35 +197,6 @@
}
/**
- * A new REEFErrorHandler is instantiated instead of lazy instantiation
- * and saving the instantiated handler as a field since the ErrorHandler is closeable.
- * @return A REEFErrorHandler object instantiated from clock config.
- * @throws InjectionException configuration error.
- */
- private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException {
- return TANG.newInjector(this.clockConfig).getInstance(REEFErrorHandler.class);
- }
-
- /**
- * A new Clock is instantiated instead of lazy instantiation and saving the instantiated
- * handler as a field since the Clock is closeable.
- * @return A Clock object instantiated from the configuration.
- * @throws InjectionException configuration error.
- */
- private Clock getClockFromConfig() throws InjectionException {
-
- final Injector clockInjector = TANG.newInjector(this.clockConfig);
-
- if (this.isWakeProfilingEnabled) {
- final WakeProfiler profiler = new WakeProfiler();
- ProfilingStopHandler.setProfiler(profiler);
- clockInjector.bindAspect(profiler);
- }
-
- return clockInjector.getInstance(Clock.class);
- }
-
- /**
* Wrap an exception into RuntimeException with a given message,
* and write the same message and exception to the log.
* @param msg an error message to log and pass into the RuntimeException.
@@ -274,19 +207,4 @@
LOG.log(Level.SEVERE, msg, t);
return new RuntimeException(msg, t);
}
-
- /**
- * Pass exception into an error handler, then wrap it into RuntimeException
- * with a given message, and write the same message and exception to the log.
- * @param errorHandler an error handler that consumes the exception before any further processing.
- * @param msg an error message to log and pass into the RuntimeException.
- * @param t A Throwable exception to log, wrap, and handle.
- * @return a new Runtime exception wrapping a Throwable.
- */
- private static RuntimeException fatal(
- final REEFErrorHandler errorHandler, final String msg, final Throwable t) {
- LOG.log(Level.SEVERE, msg, t);
- errorHandler.onNext(t);
- return new RuntimeException(msg, t);
- }
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
index be71ff5..e32537c 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
@@ -59,24 +59,33 @@
@Override
@SuppressWarnings("checkstyle:illegalcatch")
- public void onNext(final Throwable e) {
- LOG.log(Level.SEVERE, "Uncaught exception.", e);
- if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
- final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get()
- .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
- final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder()
- .setName("reef")
- .setIdentifier(launchID)
- .setMessage(e.getMessage())
- .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e)))
- .build();
- try {
- runtimeErrorHandler.onNext(message);
- } catch (final Throwable t) {
- LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
- }
- } else {
+ public void onNext(final Throwable ex) {
+
+ LOG.log(Level.SEVERE, "Uncaught exception.", ex);
+
+ if (this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream");
+ return;
+ }
+
+ try {
+
+ final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler =
+ this.remoteManager.get().getHandler(this.errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
+
+ final ReefServiceProtos.RuntimeErrorProto message =
+ ReefServiceProtos.RuntimeErrorProto.newBuilder()
+ .setName("reef")
+ .setIdentifier(this.launchID)
+ .setMessage(ex.getMessage())
+ .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(ex)))
+ .build();
+
+ runtimeErrorHandler.onNext(message);
+ LOG.log(Level.INFO, "Successfully sent the error upstream: {0}", ex.toString());
+
+ } catch (final Throwable t) {
+ LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
index 340c9b3..dd481a6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
@@ -51,19 +51,20 @@
@Override
public synchronized void uncaughtException(final Thread thread, final Throwable throwable) {
+
+ final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
+ LOG.log(Level.SEVERE, msg, throwable);
+
if (this.errorHandler == null) {
try {
- this.errorHandler = Tang.Factory.getTang().newInjector(this.errorHandlerConfig)
- .getInstance(REEFErrorHandler.class);
- } catch (InjectionException ie) {
+ this.errorHandler = Tang.Factory.getTang()
+ .newInjector(this.errorHandlerConfig).getInstance(REEFErrorHandler.class);
+ } catch (final InjectionException ie) {
LOG.log(Level.WARNING, "Unable to inject error handler.");
}
}
- final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
-
if (this.errorHandler != null) {
- LOG.log(Level.SEVERE, msg, throwable);
this.errorHandler.onNext(new Exception(msg, throwable));
try {
this.wait(100);
@@ -74,13 +75,12 @@
}
LOG.log(Level.SEVERE, msg + " System.exit(1)");
+
System.exit(1);
}
@Override
public String toString() {
- return "REEFUncaughtExceptionHandler{" +
- "errorHandler=" + String.valueOf(this.errorHandler) +
- '}';
+ return "REEFUncaughtExceptionHandler{errorHandler=" + this.errorHandler + '}';
}
}
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
index 8e3476b..9d64500 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
@@ -18,12 +18,13 @@
*/
package org.apache.reef.tests.fail.driver;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.common.REEFEnvironment;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tests.TestDriverLauncher;
import org.apache.reef.util.EnvironmentUtils;
@@ -31,11 +32,11 @@
/**
* Client for the test REEF job that fails on different stages of execution.
*/
+@Private
+@ClientSide
public final class FailClient {
- public static LauncherStatus run(final Class<?> failMsgClass,
- final Configuration runtimeConfig,
- final int timeOut) throws BindException, InjectionException {
+ private static Configuration buildDriverConfig(final Class<?> failMsgClass) {
final Configuration driverConfig = DriverConfiguration.CONF
.set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailDriver.class))
@@ -56,11 +57,42 @@
.set(DriverConfiguration.ON_TASK_COMPLETED, FailDriver.CompletedTaskHandler.class)
.build();
- final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
- cb.addConfiguration(driverConfig);
- cb.bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName());
+ return Tang.Factory.getTang().newConfigurationBuilder(driverConfig)
+ .bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName())
+ .build();
+ }
- return TestDriverLauncher.getLauncher(runtimeConfig).run(cb.build(), timeOut);
+ /**
+ * Run REEF on specified runtime and fail (raise an exception) in a specified class.
+ * @param failMsgClass A class that should fail during the test.
+ * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
+ * @param timeOut REEF application timeout.
+ * @return launcher status - usually FAIL.
+ * @throws InjectionException configuration error.
+ */
+ public static LauncherStatus runClient(final Class<?> failMsgClass,
+ final Configuration runtimeConfig, final int timeOut) throws InjectionException {
+
+ return TestDriverLauncher.getLauncher(runtimeConfig).run(buildDriverConfig(failMsgClass), timeOut);
+ }
+
+ /**
+ * Run REEF in-process using specified runtime and fail (raise an exception) in a specified class.
+ * @param failMsgClass A class that should fail during the test.
+ * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
+ * @param timeOut REEF application timeout - not used yet.
+ * @return launcher status - usually FAIL.
+ * @throws InjectionException configuration error.
+ */
+ public static LauncherStatus runInProcess(final Class<?> failMsgClass,
+ final Configuration runtimeConfig, final int timeOut) throws InjectionException {
+
+ try (final REEFEnvironment reef =
+ REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) {
+ reef.run();
+ }
+
+ return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented.
}
/**
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
index c0e6412..fe1b507 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
@@ -144,7 +144,7 @@
if (this.state == DriverState.FAILED) {
final SimulatedDriverFailure ex = new SimulatedDriverFailure(
"Simulated Failure at FailDriver :: " + msgClassName);
- LOG.log(Level.INFO, "Simulated Failure: {0}", ex);
+ LOG.log(Level.INFO, "Simulated Failure:", ex);
throw ex;
}
}
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
new file mode 100644
index 0000000..088aee3
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.driver;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.runtime.common.REEFEnvironment;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This tests whether the noop driver gets shutdown properly.
+ */
+public final class REEFEnvironmentDriverTest {
+
+ private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class)
+ .build();
+
+ private static final Configuration LOCAL_DRIVER_MODULE = LocalDriverConfiguration.CONF
+ .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
+ .set(LocalDriverConfiguration.ROOT_FOLDER, ".")
+ .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST")
+ .set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)
+ .build();
+
+ @Test
+ public void testREEFEnvironmentDriver() throws BindException, InjectionException {
+ try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) {
+ reef.run();
+ } catch (final Throwable ex) {
+ Assert.fail("Local driver execution failed: " + ex);
+ }
+ }
+}
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
index 821b4bf..7ad803b 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
@@ -62,7 +62,7 @@
private void failOn(final Class<?> clazz) throws BindException, InjectionException {
TestUtils.assertLauncherFailure(
- FailClient.run(clazz,
+ FailClient.runClient(clazz,
this.testEnvironment.getRuntimeConfiguration(), this.testEnvironment.getTestTimeout()),
SimulatedDriverFailure.class);
}
@@ -126,7 +126,7 @@
public void testDriverCompleted() throws BindException, InjectionException {
final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration();
// FailDriverTest can be replaced with any other class never used in FailDriver
- final LauncherStatus status = FailClient.run(
+ final LauncherStatus status = FailClient.runClient(
FailDriverTest.class, runtimeConfiguration, this.testEnvironment.getTestTimeout());
Assert.assertEquals(LauncherStatus.COMPLETED, status);
}