blob: fa49e88fb22393e573154275a3cb6f234ad57b73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.bridge.client;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.io.TcpPortConfigurationProvider;
import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Tang;
import org.apache.reef.wake.remote.address.LoopbackLocalAddressProvider;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
/**
* Represents a job submission from the CS code.
* <p>
* This class exists mostly to parse and validate the command line parameters provided by the C# class
* `Org.Apache.REEF.Client.Local.LocalClient`
*/
final class LocalSubmissionFromCS {
private final File driverFolder;
private final File jobFolder;
private final File runtimeRootFolder;
private final String jobId;
private final int maxNumberOfConcurrentEvaluators;
private final int tcpBeginPort;
private final int tcpRangeCount;
private final int tcpTryCount;
private final String driverStdoutPath;
private final String driverStderrPath;
private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters avroLocalJobSubmissionParameters,
final AvroLocalAppSubmissionParameters avroLocalAppSubmissionParameters) {
// We assume the given path to be the one of the driver. The job folder is one level up from there.
final AvroAppSubmissionParameters appSubmissionParameters =
avroLocalAppSubmissionParameters.getSharedAppSubmissionParameters();
this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort();
this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount();
this.tcpTryCount = appSubmissionParameters.getTcpTryCount();
this.maxNumberOfConcurrentEvaluators = avroLocalAppSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
this.driverFolder = new File(avroLocalJobSubmissionParameters
.getSharedJobSubmissionParameters().getJobSubmissionFolder().toString());
this.jobId = avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters().getJobId().toString();
this.jobFolder = driverFolder.getParentFile();
this.runtimeRootFolder = jobFolder.getParentFile();
this.driverStdoutPath = avroLocalJobSubmissionParameters.getDriverStdoutFilePath().toString();
this.driverStderrPath = avroLocalJobSubmissionParameters.getDriverStderrFilePath().toString();
Validate.isTrue(driverFolder.exists(), "The driver folder does not exist.");
Validate.notEmpty(jobId, "The job is is null or empty.");
Validate.isTrue(maxNumberOfConcurrentEvaluators >= 0, "The number of evaluators is < 0.");
Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
Validate.isTrue(StringUtils.isNotEmpty(driverStdoutPath), "The stdout path is empty.");
Validate.isTrue(StringUtils.isNotEmpty(driverStderrPath), "The stderr path is empty.");
}
/**
* @return the runtime configuration, based on the parameters passed from C#.
*/
Configuration getRuntimeConfiguration() {
final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
.set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(maxNumberOfConcurrentEvaluators))
.set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder.getAbsolutePath())
.build();
final ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<>();
driverLaunchCommandPrefixList.add(
new File(driverFolder,
new REEFFileNames().getDriverLauncherExeFile().toString()
).toString());
final Configuration userProviderConfiguration = Tang.Factory.getTang().newConfigurationBuilder()
.bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
.bindSetEntry(DriverConfigurationProviders.class, LoopbackLocalAddressProvider.class)
.bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort))
.bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount))
.bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount))
.bindNamedParameter(JobSubmissionDirectory.class, runtimeRootFolder.getAbsolutePath())
.bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList)
.build();
return Configurations.merge(runtimeConfiguration, userProviderConfiguration);
}
@Override
public String toString() {
return "LocalSubmissionFromCS{" +
"driverFolder=" + driverFolder +
", jobFolder=" + jobFolder +
", runtimeRootFolder=" + runtimeRootFolder +
", jobId='" + jobId + '\'' +
", maxNumberOfConcurrentEvaluators=" + maxNumberOfConcurrentEvaluators +
", tcpBeginPort=" + tcpBeginPort +
", tcpRangeCount=" + tcpRangeCount +
", tcpTryCount=" + tcpTryCount +
'}';
}
/**
* @return The folder in which the job is staged.
*/
File getJobFolder() {
return jobFolder;
}
/**
* @return The id of this job.
*/
String getJobId() {
return jobId;
}
/**
* @return The Driver stdout path.
*/
String getDriverStdoutPath() {
return driverStdoutPath;
}
/**
* @return The Driver stderr path.
*/
String getDriverStderrPath() {
return driverStderrPath;
}
/**
* Takes the local job submission configuration file, deserializes it, and creates submission object.
*/
static LocalSubmissionFromCS fromSubmissionParameterFiles(final File localJobSubmissionParametersFile,
final File localAppSubmissionParametersFile)
throws IOException {
final AvroLocalAppSubmissionParameters localAppSubmissionParameters;
final AvroLocalJobSubmissionParameters localJobSubmissionParameters;
try (FileInputStream fileInputStream = new FileInputStream(localJobSubmissionParametersFile)) {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream);
final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader =
new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class);
localJobSubmissionParameters = reader.read(null, decoder);
}
try (FileInputStream fileInputStream = new FileInputStream(localAppSubmissionParametersFile)) {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroLocalAppSubmissionParameters.getClassSchema(), fileInputStream);
final SpecificDatumReader<AvroLocalAppSubmissionParameters> reader =
new SpecificDatumReader<>(AvroLocalAppSubmissionParameters.class);
localAppSubmissionParameters = reader.read(null, decoder);
}
return new LocalSubmissionFromCS(localJobSubmissionParameters, localAppSubmissionParameters);
}
}