[REEF-1573] Split REEFLauncher into executable part and the Clock-running environment.

This is work towards [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) *"REEF as a library"* effort.

Summary of changes:
   * Split REEFLauncher into launcher and REEFEnvironment.
   * Improve error handling in REEFLauncher and around.
   * Fixes in logging and error reporting.
   * Add unit test for local environment driver execution.
   * Minor cosmetic and style fixes.
   * Add comments to explain checkstyle exceptions
   * Add several TODOs and related JIRAs for future work

JIRA: [REEF-1573](https://issues.apache.org/jira/browse/REEF-1573)

This closes #1133
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
new file mode 100644
index 0000000..2809d44
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.time.Clock;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entry point into any REEF process (Driver and Evaluator).
+ * It is mostly reading from the command line to instantiate
+ * the runtime clock and calling .run() on it.
+ */
+public final class REEFEnvironment implements Runnable, AutoCloseable {
+
+  /**
+   * Parameter to enable Wake network profiling. By default profiling is disabled.
+   * TODO[REEF-1629] Move that parameter and related code into Wake package.
+   */
+  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
+  private static final class ProfilingEnabled implements Name<Boolean> { }
+
+  private static final Logger LOG = Logger.getLogger(REEFEnvironment.class.getName());
+
+  private static final Tang TANG = Tang.Factory.getTang();
+
+  /** Main event loop of current REEF component (Driver or Evaluator). */
+  private final Clock clock;
+
+  /** Error handler that processes all uncaught REEF exceptions. */
+  private final REEFErrorHandler errorHandler;
+
+  /**
+   * Create a new REEF environment.
+   * @param configurations REEF component (Driver or Evaluator) configuration.
+   * If multiple configurations are provided, they will be merged before use.
+   * Main part of the configuration is usually read from config file by REEFLauncher.
+   * @throws InjectionException Thrown on configuration error.
+   */
+  @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+  public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException {
+
+    final Configuration config = Configurations.merge(configurations);
+
+    if (LOG.isLoggable(Level.FINEST)) {
+      // TODO[REEF-1633] Obtain default serializer from Tang, or use Tang to pretty print.
+      LOG.log(Level.FINEST, "Configuration:\n--\n{0}\n--",
+          new AvroConfigurationSerializer().toString(config, true));
+    }
+
+    final Injector injector = TANG.newInjector(config);
+
+    if (injector.getNamedInstance(ProfilingEnabled.class)) {
+      final WakeProfiler profiler = new WakeProfiler();
+      ProfilingStopHandler.setProfiler(profiler);
+      injector.bindAspect(profiler);
+    }
+
+    injector.getInstance(REEFVersion.class).logVersion();
+
+    final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class);
+
+    try {
+
+      final Clock clock = injector.getInstance(Clock.class);
+      return new REEFEnvironment(clock, errorHandler);
+
+    } catch (final Throwable ex) {
+      LOG.log(Level.SEVERE, "Error while instantiating the clock", ex);
+      try {
+        errorHandler.onNext(ex);
+      } catch (final Throwable exHandling) {
+        LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling);
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * Use .fromConfiguration() method to create new REEF environment.
+   * @param clock main event loop.
+   * @param errorHandler error handler.
+   */
+  private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) {
+    this.clock = clock;
+    this.errorHandler = errorHandler;
+  }
+
+  /**
+   * Close and cleanup the environment.
+   * Invoke .close() on all closeable members (clock and error handler).
+   */
+  @Override
+  @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+  public void close() {
+
+    LOG.log(Level.FINER, "Closing REEF Environment - start");
+
+    try {
+      this.clock.close();
+    } catch (final Throwable ex) {
+      LOG.log(Level.SEVERE, "Error while closing the clock", ex);
+      try {
+        this.errorHandler.onNext(ex);
+      } catch (final Throwable exHandling) {
+        LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling);
+      }
+    } finally {
+      try {
+        this.errorHandler.close();
+      } catch (final Throwable ex) {
+        LOG.log(Level.SEVERE, "Error while closing the error handler", ex);
+      }
+    }
+
+    LOG.log(Level.FINER, "Closing REEF Environment - end");
+  }
+
+  /**
+   * Launch REEF component (Driver or Evaluator).
+   * It is usually called from the static .run() method.
+   */
+  @Override
+  @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+  public void run() {
+
+    LOG.log(Level.FINE, "REEF started with user name [{0}]", System.getProperty("user.name"));
+    LOG.log(Level.FINE, "REEF started. Assertions are {0} in this process.",
+            EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
+
+    try {
+
+      LOG.log(Level.FINEST, "Clock: start");
+      this.clock.run();
+      LOG.log(Level.FINEST, "Clock: exit normally");
+
+    } catch (final Throwable ex) {
+      LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex);
+      this.errorHandler.onNext(ex);
+      throw ex;
+    }
+  }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 3ad5fe4..74de099 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -19,24 +19,18 @@
 package org.apache.reef.runtime.common;
 
 import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
-import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
 import org.apache.reef.runtime.common.launch.REEFErrorHandler;
 import org.apache.reef.runtime.common.launch.REEFMessageCodec;
 import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
 import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
 import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.BindException;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.AvroConfigurationSerializer;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.util.REEFVersion;
 import org.apache.reef.util.ThreadLogger;
 import org.apache.reef.util.logging.LoggingSetup;
-import org.apache.reef.wake.profiler.WakeProfiler;
 import org.apache.reef.wake.remote.RemoteConfiguration;
 import org.apache.reef.wake.time.Clock;
 
@@ -54,12 +48,6 @@
  */
 public final class REEFLauncher {
 
-  /**
-   * Parameter to enable profiling. By default profiling is disabled.
-   */
-  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
-  private static final class ProfilingEnabled implements Name<Boolean> { }
-
   private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
 
   private static final Tang TANG = Tang.Factory.getTang();
@@ -76,42 +64,28 @@
     LoggingSetup.setupCommonsLogging();
   }
 
-  /** Config parameter to turn on network IO profiling in Wake. */
-  private final boolean isWakeProfilingEnabled;
-
-  /** REEF version - we need it simply to write it to the log. */
-  private final REEFVersion reefVersion;
-
   /**
    * Main configuration object of the REEF component we are launching here.
-   * The launcher uses that configuration to instantiate the Clock object,
+   * REEFEnvironment uses that configuration to instantiate the Clock object,
    * and then call .run() on it.
    */
-  private final Configuration clockConfig;
+  private final Configuration envConfig;
 
   /**
    * REEFLauncher is instantiated in the main() method below using
    * Tang configuration file provided as a command line argument.
    * @param configurationPath Path to the serialized Tang configuration file.
    * (The file must be in the local file system).
-   * @param enableProfiling If true, turn on profiling in Wake.
    * @param configurationSerializer Serializer used to read the configuration file.
    * We currently use Avro to serialize Tang configs.
-   * @param reefVersion An injectable object that contains REEF version.
    */
   @Inject
   private REEFLauncher(
       @Parameter(ClockConfigurationPath.class) final String configurationPath,
-      @Parameter(ProfilingEnabled.class) final boolean enableProfiling,
-      final ConfigurationSerializer configurationSerializer,
-      final REEFVersion reefVersion) {
+      final ConfigurationSerializer configurationSerializer) {
 
-    this.isWakeProfilingEnabled = enableProfiling;
-    this.reefVersion = reefVersion;
-
-    this.clockConfig = Configurations.merge(
-        readConfigurationFromDisk(configurationPath, configurationSerializer),
-        LAUNCHER_STATIC_CONFIG);
+    this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG,
+        readConfigurationFromDisk(configurationPath, configurationSerializer));
   }
 
   /**
@@ -130,10 +104,10 @@
 
       return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class);
 
-    } catch (final BindException e) {
-      throw fatal("Error in parsing the command line", e);
-    } catch (final InjectionException e) {
-      throw fatal("Unable to run REEFLauncher.", e);
+    } catch (final BindException ex) {
+      throw fatal("Error in parsing the command line", ex);
+    } catch (final InjectionException ex) {
+      throw fatal("Unable to instantiate REEFLauncher.", ex);
     }
   }
 
@@ -169,11 +143,7 @@
     try {
 
       final Configuration config = serializer.fromFile(evaluatorConfigFile);
-
-      if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "Configuration file {0} loaded:\n--\n{1}\n--",
-            new Object[] {configPath, new AvroConfigurationSerializer().toString(config, true)});
-      }
+      LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
 
       return config;
 
@@ -184,13 +154,14 @@
 
   /**
    * Launches a REEF client process (Driver or Evaluator).
-   * @param args Command-line arguments -
-   * must be a single element containing local path to the configuration file.
+   * @param args Command-line arguments.
+   * Must be a single element containing local path to the configuration file.
    */
   @SuppressWarnings("checkstyle:illegalcatch")
   public static void main(final String[] args) {
 
     LOG.log(Level.INFO, "Entering REEFLauncher.main().");
+
     LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
     LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
             EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
@@ -204,21 +175,12 @@
 
     final REEFLauncher launcher = getREEFLauncher(args[0]);
 
-    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig));
-    launcher.reefVersion.logVersion(); // Write REEF version to the log.
+    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
 
-    try (final Clock clock = launcher.getClockFromConfig()) {
-
-      LOG.log(Level.FINE, "Clock: start");
-      clock.run();
-      LOG.log(Level.FINE, "Clock: exit normally");
-
+    try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) {
+      reef.run();
     } catch (final Throwable ex) {
-      try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) {
-        throw fatal(errorHandler, "Unable to instantiate the clock", ex);
-      } catch (final InjectionException e) {
-        throw fatal("Unable to instantiate the clock and the ErrorHandler", e);
-      }
+      throw fatal("Unable to configure and start REEFEnvironment.", ex);
     }
 
     LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
@@ -235,35 +197,6 @@
   }
 
   /**
-   * A new REEFErrorHandler is instantiated instead of lazy instantiation
-   * and saving the instantiated handler as a field since the ErrorHandler is closeable.
-   * @return A REEFErrorHandler object instantiated from clock config.
-   * @throws InjectionException configuration error.
-   */
-  private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException {
-    return TANG.newInjector(this.clockConfig).getInstance(REEFErrorHandler.class);
-  }
-
-  /**
-   * A new Clock is instantiated instead of lazy instantiation and saving the instantiated
-   * handler as a field since the Clock is closeable.
-   * @return A Clock object instantiated from the configuration.
-   * @throws InjectionException configuration error.
-   */
-  private Clock getClockFromConfig() throws InjectionException {
-
-    final Injector clockInjector = TANG.newInjector(this.clockConfig);
-
-    if (this.isWakeProfilingEnabled) {
-      final WakeProfiler profiler = new WakeProfiler();
-      ProfilingStopHandler.setProfiler(profiler);
-      clockInjector.bindAspect(profiler);
-    }
-
-    return clockInjector.getInstance(Clock.class);
-  }
-
-  /**
    * Wrap an exception into RuntimeException with a given message,
    * and write the same message and exception to the log.
    * @param msg an error message to log and pass into the RuntimeException.
@@ -274,19 +207,4 @@
     LOG.log(Level.SEVERE, msg, t);
     return new RuntimeException(msg, t);
   }
-
-  /**
-   * Pass exception into an error handler, then wrap it into RuntimeException
-   * with a given message, and write the same message and exception to the log.
-   * @param errorHandler an error handler that consumes the exception before any further processing.
-   * @param msg an error message to log and pass into the RuntimeException.
-   * @param t A Throwable exception to log, wrap, and handle.
-   * @return a new Runtime exception wrapping a Throwable.
-   */
-  private static RuntimeException fatal(
-      final REEFErrorHandler errorHandler, final String msg, final Throwable t) {
-    LOG.log(Level.SEVERE, msg, t);
-    errorHandler.onNext(t);
-    return new RuntimeException(msg, t);
-  }
 }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
index be71ff5..e32537c 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
@@ -59,24 +59,33 @@
 
   @Override
   @SuppressWarnings("checkstyle:illegalcatch")
-  public void onNext(final Throwable e) {
-    LOG.log(Level.SEVERE, "Uncaught exception.", e);
-    if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
-      final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get()
-          .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
-      final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder()
-          .setName("reef")
-          .setIdentifier(launchID)
-          .setMessage(e.getMessage())
-          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e)))
-          .build();
-      try {
-        runtimeErrorHandler.onNext(message);
-      } catch (final Throwable t) {
-        LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
-      }
-    } else {
+  public void onNext(final Throwable ex) {
+
+    LOG.log(Level.SEVERE, "Uncaught exception.", ex);
+
+    if (this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
       LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream");
+      return;
+    }
+
+    try {
+
+      final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler =
+          this.remoteManager.get().getHandler(this.errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
+
+      final ReefServiceProtos.RuntimeErrorProto message =
+          ReefServiceProtos.RuntimeErrorProto.newBuilder()
+              .setName("reef")
+              .setIdentifier(this.launchID)
+              .setMessage(ex.getMessage())
+              .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(ex)))
+              .build();
+
+      runtimeErrorHandler.onNext(message);
+      LOG.log(Level.INFO, "Successfully sent the error upstream: {0}", ex.toString());
+
+    } catch (final Throwable t) {
+      LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
     }
   }
 
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
index 340c9b3..dd481a6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
@@ -51,19 +51,20 @@
 
   @Override
   public synchronized void uncaughtException(final Thread thread, final Throwable throwable) {
+
+    final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
+    LOG.log(Level.SEVERE, msg, throwable);
+
     if (this.errorHandler == null) {
       try {
-        this.errorHandler = Tang.Factory.getTang().newInjector(this.errorHandlerConfig)
-            .getInstance(REEFErrorHandler.class);
-      } catch (InjectionException ie) {
+        this.errorHandler = Tang.Factory.getTang()
+            .newInjector(this.errorHandlerConfig).getInstance(REEFErrorHandler.class);
+      } catch (final InjectionException ie) {
         LOG.log(Level.WARNING, "Unable to inject error handler.");
       }
     }
 
-    final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
-
     if (this.errorHandler != null) {
-      LOG.log(Level.SEVERE, msg, throwable);
       this.errorHandler.onNext(new Exception(msg, throwable));
       try {
         this.wait(100);
@@ -74,13 +75,12 @@
     }
 
     LOG.log(Level.SEVERE, msg + " System.exit(1)");
+
     System.exit(1);
   }
 
   @Override
   public String toString() {
-    return "REEFUncaughtExceptionHandler{" +
-        "errorHandler=" + String.valueOf(this.errorHandler) +
-        '}';
+    return "REEFUncaughtExceptionHandler{errorHandler=" + this.errorHandler + '}';
   }
 }
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
index 8e3476b..9d64500 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
@@ -18,12 +18,13 @@
  */
 package org.apache.reef.tests.fail.driver;
 
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.common.REEFEnvironment;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.BindException;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tests.TestDriverLauncher;
 import org.apache.reef.util.EnvironmentUtils;
@@ -31,11 +32,11 @@
 /**
  * Client for the test REEF job that fails on different stages of execution.
  */
+@Private
+@ClientSide
 public final class FailClient {
 
-  public static LauncherStatus run(final Class<?> failMsgClass,
-                                   final Configuration runtimeConfig,
-                                   final int timeOut) throws BindException, InjectionException {
+  private static Configuration buildDriverConfig(final Class<?> failMsgClass) {
 
     final Configuration driverConfig = DriverConfiguration.CONF
         .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailDriver.class))
@@ -56,11 +57,42 @@
         .set(DriverConfiguration.ON_TASK_COMPLETED, FailDriver.CompletedTaskHandler.class)
         .build();
 
-    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
-    cb.addConfiguration(driverConfig);
-    cb.bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName());
+    return Tang.Factory.getTang().newConfigurationBuilder(driverConfig)
+        .bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName())
+        .build();
+  }
 
-    return TestDriverLauncher.getLauncher(runtimeConfig).run(cb.build(), timeOut);
+  /**
+   * Run REEF on specified runtime and fail (raise an exception) in a specified class.
+   * @param failMsgClass A class that should fail during the test.
+   * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
+   * @param timeOut REEF application timeout.
+   * @return launcher status - usually FAIL.
+   * @throws InjectionException configuration error.
+   */
+  public static LauncherStatus runClient(final Class<?> failMsgClass,
+      final Configuration runtimeConfig, final int timeOut) throws InjectionException {
+
+    return TestDriverLauncher.getLauncher(runtimeConfig).run(buildDriverConfig(failMsgClass), timeOut);
+  }
+
+  /**
+   * Run REEF in-process using specified runtime and fail (raise an exception) in a specified class.
+   * @param failMsgClass A class that should fail during the test.
+   * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
+   * @param timeOut REEF application timeout - not used yet.
+   * @return launcher status - usually FAIL.
+   * @throws InjectionException configuration error.
+   */
+  public static LauncherStatus runInProcess(final Class<?> failMsgClass,
+      final Configuration runtimeConfig, final int timeOut) throws InjectionException {
+
+    try (final REEFEnvironment reef =
+             REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) {
+      reef.run();
+    }
+
+    return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented.
   }
 
   /**
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
index c0e6412..fe1b507 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java
@@ -144,7 +144,7 @@
     if (this.state == DriverState.FAILED) {
       final SimulatedDriverFailure ex = new SimulatedDriverFailure(
           "Simulated Failure at FailDriver :: " + msgClassName);
-      LOG.log(Level.INFO, "Simulated Failure: {0}", ex);
+      LOG.log(Level.INFO, "Simulated Failure:", ex);
       throw ex;
     }
   }
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
new file mode 100644
index 0000000..088aee3
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.driver;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.runtime.common.REEFEnvironment;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This tests whether the noop driver gets shutdown properly.
+ */
+public final class REEFEnvironmentDriverTest {
+
+  private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF
+      .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class))
+      .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest")
+      .set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class)
+      .build();
+
+  private static final Configuration LOCAL_DRIVER_MODULE = LocalDriverConfiguration.CONF
+      .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
+      .set(LocalDriverConfiguration.ROOT_FOLDER, ".")
+      .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+      .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+      .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST")
+      .set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)
+      .build();
+
+  @Test
+  public void testREEFEnvironmentDriver() throws BindException, InjectionException {
+    try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) {
+      reef.run();
+    } catch (final Throwable ex) {
+      Assert.fail("Local driver execution failed: " + ex);
+    }
+  }
+}
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
index 821b4bf..7ad803b 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java
@@ -62,7 +62,7 @@
 
   private void failOn(final Class<?> clazz) throws BindException, InjectionException {
     TestUtils.assertLauncherFailure(
-        FailClient.run(clazz,
+        FailClient.runClient(clazz,
             this.testEnvironment.getRuntimeConfiguration(), this.testEnvironment.getTestTimeout()),
         SimulatedDriverFailure.class);
   }
@@ -126,7 +126,7 @@
   public void testDriverCompleted() throws BindException, InjectionException {
     final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration();
     // FailDriverTest can be replaced with any other class never used in FailDriver
-    final LauncherStatus status = FailClient.run(
+    final LauncherStatus status = FailClient.runClient(
         FailDriverTest.class, runtimeConfiguration, this.testEnvironment.getTestTimeout());
     Assert.assertEquals(LauncherStatus.COMPLETED, status);
   }