blob: 1e9ee7169a3ff832c35c63fcf7e91952b630e1fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.bridge.client;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.reef.client.DriverRestartConfiguration;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.io.TcpPortConfigurationProvider;
import org.apache.reef.javabridge.generic.JobDriver;
import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier;
import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
import org.apache.reef.tang.*;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This is the Java Driver configuration generator for .NET Drivers that generates
* the Driver configuration at runtime. Called by {@link YarnBootstrapREEFLauncher}.
*/
final class YarnBootstrapDriverConfigGenerator {
private static final Logger LOG = Logger.getLogger(YarnBootstrapDriverConfigGenerator.class.getName());
private final REEFFileNames reefFileNames;
private final ConfigurationSerializer configurationSerializer;
@Inject
private YarnBootstrapDriverConfigGenerator(final REEFFileNames reefFileNames,
final ConfigurationSerializer configurationSerializer) {
this.configurationSerializer = configurationSerializer;
this.reefFileNames = reefFileNames;
}
public String writeDriverConfigurationFile(final String bootstrapJobArgsLocation,
final String bootstrapAppArgsLocation) throws IOException {
final File bootstrapJobArgsFile = new File(bootstrapJobArgsLocation).getCanonicalFile();
final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation);
final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs =
readYarnJobSubmissionParametersFromFile(bootstrapJobArgsFile);
final AvroYarnAppSubmissionParameters yarnBootstrapAppArgs =
readYarnAppSubmissionParametersFromFile(bootstrapAppArgsFile);
final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapJobArgs, yarnBootstrapAppArgs),
new File(driverConfigPath));
return driverConfigPath;
}
static Configuration getYarnDriverConfiguration(
final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
final AvroYarnAppSubmissionParameters yarnAppSubmissionParams) {
final AvroJobSubmissionParameters jobSubmissionParameters =
yarnJobSubmissionParams.getSharedJobSubmissionParameters();
final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
.set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
.set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString())
.set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
.set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
.set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME)
.build();
final AvroAppSubmissionParameters appSubmissionParams = yarnAppSubmissionParams.getSharedAppSubmissionParameters();
final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
.bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
.bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(appSubmissionParams.getTcpBeginPort()))
.bindNamedParameter(TcpPortRangeCount.class, Integer.toString(appSubmissionParams.getTcpRangeCount()))
.bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount()))
.bindNamedParameter(JobSubmissionDirectoryPrefix.class,
yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
.bindNamedParameter(FileSystemUrl.class,
yarnJobSubmissionParams.getFileSystemUrl().toString())
.build();
final Configuration driverConfiguration = Configurations.merge(
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, yarnDriverConfiguration, providerConfig);
if (yarnAppSubmissionParams.getDriverRecoveryTimeout() > 0) {
LOG.log(Level.FINE, "Driver restart is enabled.");
final Configuration yarnDriverRestartConfiguration =
YarnDriverRestartConfiguration.CONF.build();
final Configuration driverRestartConfiguration =
DriverRestartConfiguration.CONF
.set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
JobDriver.DriverRestartActiveContextHandler.class)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
JobDriver.DriverRestartRunningTaskHandler.class)
.set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
yarnAppSubmissionParams.getDriverRecoveryTimeout())
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
JobDriver.DriverRestartCompletedHandler.class)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
JobDriver.DriverRestartFailedEvaluatorHandler.class)
.build();
return Configurations.merge(driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
}
return driverConfiguration;
}
static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromFile(final File file)
throws IOException {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
// This is mainly a test hook.
return readYarnAppSubmissionParametersFromInputStream(fileInputStream);
}
}
static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromInputStream(
final InputStream inputStream) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroYarnAppSubmissionParameters.getClassSchema(), inputStream);
final SpecificDatumReader<AvroYarnAppSubmissionParameters> reader = new SpecificDatumReader<>(
AvroYarnAppSubmissionParameters.class);
return reader.read(null, decoder);
}
static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromFile(final File file)
throws IOException {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
// This is mainly a test hook.
return readYarnJobSubmissionParametersFromInputStream(fileInputStream);
}
}
static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromInputStream(
final InputStream inputStream) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
AvroYarnJobSubmissionParameters.class);
return reader.read(null, decoder);
}
}