blob: 5f2128f1f297afe8d5692489d4b39fc60627cc0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.io.data.loading.api;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer;
import org.apache.reef.io.data.loading.impl.AvroEvaluatorRequestSerializer;
import org.apache.reef.io.data.loading.impl.SingleDataCenterEvaluatorToPartitionStrategy;
import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
import org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy;
import org.apache.reef.runtime.common.utils.Constants;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.ConfigurationModule;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Builder to create a request to the DataLoadingService.
*/
public final class DataLoadingRequestBuilder
implements org.apache.reef.util.Builder<Configuration> {
// constant used in several places.
private static final int UNINITIALIZED = -1;
private int numberOfDesiredSplits = UNINITIALIZED;
private final List<EvaluatorRequest> computeRequests = new ArrayList<>();
private final List<EvaluatorRequest> dataRequests = new ArrayList<>();
private boolean inMemory = false;
private boolean renewFailedEvaluators = true;
private ConfigurationModule driverConfigurationModule = null;
private String inputFormatClass;
/**
* Single data center loading strategy flag. Allows to specify if the data
* will be loaded in machines of a single data center or not. By
* default, is set to true.
*/
private boolean singleDataCenterStrategy = true;
/**
* Distributed dataset that can contain many distributed partitions.
*/
private DistributedDataSet distributedDataSet;
/**
* The input path of the data to be loaded.
*/
private String inputPath;
public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
this.numberOfDesiredSplits = numberOfDesiredSplits;
return this;
}
/**
* Adds the requests to the compute requests list.
*
* @param computeRequests
* the compute requests to add
* @return this
*/
@SuppressWarnings("checkstyle:hiddenfield")
public DataLoadingRequestBuilder addComputeRequests(final List<EvaluatorRequest> computeRequests) {
for (final EvaluatorRequest computeRequest : computeRequests) {
addComputeRequest(computeRequest);
}
return this;
}
/**
* Adds the requests to the data requests list.
*
* @param dataRequests
* the data requests to add
* @return this
*/
@SuppressWarnings("checkstyle:hiddenfield")
public DataLoadingRequestBuilder addDataRequests(final List<EvaluatorRequest> dataRequests) {
for (final EvaluatorRequest dataRequest : dataRequests) {
addDataRequest(dataRequest);
}
return this;
}
/**
* Adds a single request to the compute requests list.
*
* @param computeRequest
* the compute request to add
* @return this
*/
public DataLoadingRequestBuilder addComputeRequest(final EvaluatorRequest computeRequest) {
this.computeRequests.add(computeRequest);
return this;
}
/**
* Adds a single request to the data requests list.
*
* @param dataRequest
* the data request to add
* @return this
*/
public DataLoadingRequestBuilder addDataRequest(final EvaluatorRequest dataRequest) {
this.dataRequests.add(dataRequest);
return this;
}
@SuppressWarnings("checkstyle:hiddenfield")
public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) {
this.inMemory = inMemory;
return this;
}
@SuppressWarnings("checkstyle:hiddenfield")
public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) {
this.renewFailedEvaluators = renewFailedEvaluators;
return this;
}
public DataLoadingRequestBuilder setDriverConfigurationModule(
final ConfigurationModule driverConfigurationModule) {
this.driverConfigurationModule = driverConfigurationModule;
return this;
}
public DataLoadingRequestBuilder setInputFormatClass(
final Class<? extends InputFormat> inputFormatClass) {
this.inputFormatClass = inputFormatClass.getName();
return this;
}
/**
* Sets the path of the folder where the data is.
* Internally, a distributed dataset with a unique partition is created,
* and {@link SingleDataCenterEvaluatorToPartitionStrategy} is binded.
*
* @param inputPath
* the input path
* @return this
*/
public DataLoadingRequestBuilder setInputPath(final String inputPath) {
this.inputPath = inputPath;
this.singleDataCenterStrategy = true;
return this;
}
/**
* Sets the distributed data set.
* Internally, a {@link MultiDataCenterEvaluatorToPartitionStrategy} is binded.
*
* @param distributedDataSet
* the distributed data set
* @return this
*/
public DataLoadingRequestBuilder setDistributedDataSet(final DistributedDataSet distributedDataSet) {
this.distributedDataSet = distributedDataSet;
this.singleDataCenterStrategy = false;
return this;
}
@Override
public Configuration build() throws BindException {
if (this.driverConfigurationModule == null) {
throw new BindException("Driver Configuration Module is a required parameter.");
}
// need to create the distributed data set
if (this.singleDataCenterStrategy) {
if (this.inputPath == null) {
throw new BindException("Should specify an input path.");
}
if (this.distributedDataSet != null && !this.distributedDataSet.isEmpty()) {
throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both");
}
// Create a distributed data set with one partition, the splits defined by
// the user if greater than 0 or no splits, and data to be loaded from
// anywhere.
final DistributedDataSet dds = new DistributedDataSet();
dds.addPartition(DistributedDataSetPartition
.newBuilder()
.setPath(inputPath)
.setLocation(Constants.ANY_RACK)
.setDesiredSplits(numberOfDesiredSplits > 0 ?
numberOfDesiredSplits :
Integer.parseInt(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build());
this.distributedDataSet = dds;
} else {
if (this.inputPath != null) {
throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both");
}
}
if (this.distributedDataSet == null || this.distributedDataSet.isEmpty()) {
throw new BindException("Distributed Data Set is a required parameter.");
}
if (this.inputFormatClass == null) {
this.inputFormatClass = TextInputFormat.class.getName();
}
final Configuration driverConfiguration;
if (renewFailedEvaluators) {
driverConfiguration = this.driverConfigurationModule
.set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class)
.build();
} else {
driverConfiguration = this.driverConfigurationModule
.set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
.build();
}
final JavaConfigurationBuilder jcb =
Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration);
Validate.isTrue(!this.dataRequests.isEmpty(),
"Number of cores and memory are deprecated; you have to add specific data requests");
for (final EvaluatorRequest request : this.dataRequests) {
jcb.bindSetEntry(DataLoadingDataRequests.class, AvroEvaluatorRequestSerializer.toString(request));
}
// compute requests can be empty to maintain compatibility with previous code.
if (!this.computeRequests.isEmpty()) {
for (final EvaluatorRequest request : this.computeRequests) {
jcb.bindSetEntry(DataLoadingComputeRequests.class, AvroEvaluatorRequestSerializer.toString(request));
}
}
jcb.bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
.bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass);
final Iterator<DistributedDataSetPartition> partitions = this.distributedDataSet.iterator();
while (partitions.hasNext()) {
jcb.bindSetEntry(
DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class,
DistributedDataSetPartitionSerializer.serialize(partitions.next()));
}
// we do this check for backwards compatibility, if the user defined it
// wants to use the single data center loading strategy, we bind that implementation.
if (this.singleDataCenterStrategy) {
jcb.bindImplementation(EvaluatorToPartitionStrategy.class, SingleDataCenterEvaluatorToPartitionStrategy.class);
} else {
// otherwise, we bind the strategy that will allow the user to specify
// which evaluators can load the different partitions in a multi data center network topology
jcb.bindImplementation(EvaluatorToPartitionStrategy.class, MultiDataCenterEvaluatorToPartitionStrategy.class);
}
return jcb.bindImplementation(DataLoadingService.class, InputFormatLoadingService.class).build();
}
@NamedParameter(short_name = "num_splits", default_value = NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)
public static final class NumberOfDesiredSplits implements Name<Integer> {
static final String DEFAULT_DESIRED_SPLITS = "0";
}
/**
* Allows to specify a set of compute requests to send to the DataLoader.
*/
@NamedParameter(doc = "Sets of compute requests to request to the DataLoader, " +
"i.e. evaluators requests that will not load data")
static final class DataLoadingComputeRequests implements Name<Set<String>> {
}
/**
* Allows to specify a set of data requests to send to the DataLoader.
*/
@NamedParameter(doc = "Sets of data requests to request to the DataLoader, " +
"i.e. evaluators requests that will load data")
static final class DataLoadingDataRequests implements Name<Set<String>> {
}
@NamedParameter(default_value = "false")
public static final class LoadDataIntoMemory implements Name<Boolean> {
}
}