blob: 348d789a7cf82f6530dc509599da803de227475f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.common;
import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
import org.apache.reef.runtime.common.launch.REEFErrorHandler;
import org.apache.reef.runtime.common.launch.REEFMessageCodec;
import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
import org.apache.reef.tang.*;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.EnvironmentUtils;
import org.apache.reef.util.ThreadLogger;
import org.apache.reef.util.logging.LoggingSetup;
import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.time.Clock;
import javax.inject.Inject;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The main entry point into any REEF process (Driver and Evaluator).
* It is mostly reading from the command line to instantiate
* the runtime clock and calling .run() on it.
*/
public final class REEFLauncher {
private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
private static final Tang TANG = Tang.Factory.getTang();
private static final Configuration LAUNCHER_STATIC_CONFIG =
TANG.newConfigurationBuilder()
.bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
.bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
.bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
.bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
.build();
static {
LoggingSetup.setupCommonsLogging();
}
/**
* Main configuration object of the REEF component we are launching here.
* REEFEnvironment uses that configuration to instantiate the Clock object,
* and then call .run() on it.
*/
private final Configuration envConfig;
/**
* REEFLauncher is instantiated in the main() method below using
* Tang configuration file provided as a command line argument.
* @param configurationPath Path to the serialized Tang configuration file.
* (The file must be in the local file system).
* @param configurationSerializer Serializer used to read the configuration file.
* We currently use Avro to serialize Tang configs.
*/
@Inject
private REEFLauncher(
@Parameter(ClockConfigurationPath.class) final String configurationPath,
final ConfigurationSerializer configurationSerializer) {
this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG,
readConfigurationFromDisk(configurationPath, configurationSerializer));
}
/**
* Instantiate REEF Launcher. This method is called from REEFLauncher.main().
* @param clockConfigPath Path to the local file that contains serialized configuration
* of a REEF component to launch (can be either Driver or Evaluator).
* @return An instance of the configured REEFLauncher object.
*/
private static REEFLauncher getREEFLauncher(final String clockConfigPath) {
try {
final Configuration clockArgConfig = TANG.newConfigurationBuilder()
.bindNamedParameter(ClockConfigurationPath.class, clockConfigPath)
.build();
return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class);
} catch (final BindException ex) {
throw fatal("Error in parsing the command line", ex);
} catch (final InjectionException ex) {
throw fatal("Unable to instantiate REEFLauncher.", ex);
}
}
/**
* Read configuration from a given file and deserialize it
* into Tang configuration object that can be used for injection.
* Configuration is currently serialized using Avro.
* This method also prints full deserialized configuration into log.
* @param configPath Path to the local file that contains serialized configuration
* of a REEF component to launch (can be either Driver or Evaluator).
* @param serializer An object to deserialize the configuration file.
* @return Tang configuration read and deserialized from a given file.
*/
private static Configuration readConfigurationFromDisk(
final String configPath, final ConfigurationSerializer serializer) {
LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
final File evaluatorConfigFile = new File(configPath);
if (!evaluatorConfigFile.exists()) {
throw fatal(
"Configuration file " + configPath + " does not exist. Can be an issue in job submission.",
new FileNotFoundException(configPath));
}
if (!evaluatorConfigFile.canRead()) {
throw fatal(
"Configuration file " + configPath + " exists, but can't be read.",
new IOException(configPath));
}
try {
final Configuration config = serializer.fromFile(evaluatorConfigFile);
LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
return config;
} catch (final IOException e) {
throw fatal("Unable to parse the configuration file: " + configPath, e);
}
}
/**
* Launches a REEF client process (Driver or Evaluator).
* @param args Command-line arguments.
* Must be a single element containing local path to the configuration file.
*/
@SuppressWarnings("checkstyle:illegalcatch")
public static void main(final String[] args) {
LOG.log(Level.INFO, "Entering REEFLauncher.main().");
LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
if (args.length != 1) {
final String message = "REEFLauncher have one and only one argument to specify the runtime clock " +
"configuration path";
throw fatal(message, new IllegalArgumentException(message));
}
final REEFLauncher launcher = getREEFLauncher(args[0]);
Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
try (REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) {
reef.run();
} catch (final Throwable ex) {
throw fatal("Unable to configure and start REEFEnvironment.", ex);
}
ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after REEFEnvironment.close():");
LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
}
/**
* Wrap an exception into RuntimeException with a given message,
* and write the same message and exception to the log.
* @param msg an error message to log and pass into the RuntimeException.
* @param t A Throwable exception to log and wrap.
* @return a new Runtime exception wrapping a Throwable.
*/
private static RuntimeException fatal(final String msg, final Throwable t) {
LOG.log(Level.SEVERE, msg, t);
return new RuntimeException(msg, t);
}
}