blob: 7bda1f339ce25bf8902a101c4958a420d4ebbdfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.io.data.loading.impl;
import org.apache.commons.lang.Validate;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
import org.apache.reef.tang.ExternalConstructor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This is an abstract class useful for {@link EvaluatorToPartitionStrategy}
* implementations. Contains a template implementation of
* {@link EvaluatorToPartitionStrategy#getInputSplit(NodeDescriptor, String)}
* that call abstract methods implemented by subclasses. If your implementation
* does not need this logic, you should just implement the
* {@link EvaluatorToPartitionStrategy} interface and do not extend this class.
*/
@DriverSide
@Unstable
public abstract class AbstractEvaluatorToPartitionStrategy implements EvaluatorToPartitionStrategy<InputSplit> {
private static final Logger LOG = Logger.getLogger(AbstractEvaluatorToPartitionStrategy.class.getName());
protected final ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> locationToSplits;
protected final ConcurrentMap<String, NumberedSplit<InputSplit>> evaluatorToSplits;
protected final BlockingQueue<NumberedSplit<InputSplit>> unallocatedSplits;
private int totalNumberOfSplits;
@SuppressWarnings("rawtypes")
AbstractEvaluatorToPartitionStrategy(
final String inputFormatClassName, final Set<String> serializedDataPartitions) {
LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
Validate.notEmpty(inputFormatClassName);
Validate.notEmpty(serializedDataPartitions);
locationToSplits = new ConcurrentHashMap<>();
evaluatorToSplits = new ConcurrentHashMap<>();
unallocatedSplits = new LinkedBlockingQueue<>();
setUp();
final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = new HashMap<>();
for (final String serializedDataPartition : serializedDataPartitions) {
final DistributedDataSetPartition dp = DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition);
final ExternalConstructor<JobConf> jobConfExternalConstructor = new JobConfExternalConstructor(
inputFormatClassName, dp.getPath());
try {
final JobConf jobConf = jobConfExternalConstructor.newInstance();
final InputFormat inputFormat = jobConf.getInputFormat();
final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, dp.getDesiredSplits());
if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] {dp, Arrays.toString(inputSplits)});
}
this.totalNumberOfSplits += inputSplits.length;
splitsPerPartition.put(dp, inputSplits);
} catch (final IOException e) {
throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
}
}
init(splitsPerPartition);
LOG.log(Level.FINE, "Total Number of splits: {0}", this.totalNumberOfSplits);
}
/**
* Initializes the locations of the splits where we'd like to be loaded into.
* Sets all the splits to unallocated
*
* @param splitsPerPartition
* a map containing the input splits per data partition
*/
private void init(final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) {
final Pair<InputSplit[], DistributedDataSetPartition[]>
splitsAndPartitions = getSplitsAndPartitions(splitsPerPartition);
final InputSplit[] splits = splitsAndPartitions.getFirst();
final DistributedDataSetPartition[] partitions = splitsAndPartitions.getSecond();
Validate.isTrue(splits.length == partitions.length);
for (int splitNum = 0; splitNum < splits.length; splitNum++) {
LOG.log(Level.FINE, "Processing split: " + splitNum);
final InputSplit split = splits[splitNum];
final NumberedSplit<InputSplit> numberedSplit = new NumberedSplit<>(split, splitNum,
partitions[splitNum]);
unallocatedSplits.add(numberedSplit);
updateLocations(numberedSplit);
}
if (LOG.isLoggable(Level.FINE)) {
for (final Map.Entry<String, BlockingQueue<NumberedSplit<InputSplit>>> locSplit : locationToSplits.entrySet()) {
LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
}
}
}
/**
* Each strategy should update the locations where they want the split to be
* loaded into. For example, the split physical location, certain node,
* certain rack
*
* @param numberedSplit
* the numberedSplit
*/
protected abstract void updateLocations(NumberedSplit<InputSplit> numberedSplit);
/**
* Tries to allocate a split in an evaluator based on some particular rule.
* For example, based on the rack name, randomly, etc.
*
* @param nodeDescriptor
* the node descriptor to extract information from
* @param evaluatorId
* the evaluator id where we want to allocate the numberedSplit
* @return a numberedSplit or null if couldn't allocate one
*/
protected abstract NumberedSplit<InputSplit> tryAllocate(NodeDescriptor nodeDescriptor, String evaluatorId);
/**
* Called in the constructor. Allows children to setUp the objects they will
* need in
* {@link AbstractEvaluatorToPartitionStrategy#updateLocations(InputSplit, NumberedSplit)}
* and
* {@link AbstractEvaluatorToPartitionStrategy#tryAllocate(NodeDescriptor, String)}
* methods.
* By default we provide an empty implementation.
*/
protected void setUp() {
// empty implementation by default
}
/**
* Get an input split to be assigned to this evaluator.
* <p>
* Allocates one if its not already allocated
*
* @param evaluatorId
* @return a numberedSplit
* @throws RuntimeException
* if couldn't find any split
*/
@Override
public NumberedSplit<InputSplit> getInputSplit(final NodeDescriptor nodeDescriptor, final String evaluatorId) {
synchronized (evaluatorToSplits) {
if (evaluatorToSplits.containsKey(evaluatorId)) {
LOG.log(Level.FINE, "Found an already allocated split, {0}", evaluatorToSplits.toString());
return evaluatorToSplits.get(evaluatorId);
}
}
// always first try to allocate based on the hostName
final String hostName = nodeDescriptor.getName();
LOG.log(Level.FINE, "Allocated split not found, trying on {0}", hostName);
if (locationToSplits.containsKey(hostName)) {
LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[] {evaluatorId, hostName});
final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
if (split != null) {
return split;
}
}
LOG.log(Level.FINE, "{0} does not host any splits or someone else took splits hosted here. Picking other ones",
hostName);
final NumberedSplit<InputSplit> split = tryAllocate(nodeDescriptor, evaluatorId);
if (split == null) {
throw new RuntimeException("Unable to find an input split to evaluator " + evaluatorId);
} else {
LOG.log(Level.FINE, evaluatorToSplits.toString());
}
return split;
}
@Override
public int getNumberOfSplits() {
return this.totalNumberOfSplits;
}
private Pair<InputSplit[], DistributedDataSetPartition[]> getSplitsAndPartitions(
final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) {
final List<InputSplit> inputSplits = new ArrayList<>();
final List<DistributedDataSetPartition> partitions = new ArrayList<>();
for (final Entry<DistributedDataSetPartition, InputSplit[]> entry : splitsPerPartition.entrySet()) {
final DistributedDataSetPartition partition = entry.getKey();
final InputSplit[] splits = entry.getValue();
for (final InputSplit split : splits) {
inputSplits.add(split);
partitions.add(partition);
}
}
return new Pair<>(inputSplits.toArray(new InputSplit[inputSplits.size()]),
partitions.toArray(new DistributedDataSetPartition[partitions.size()]));
}
/**
* Allocates the first available split into the evaluator.
*
* @param evaluatorId
* the evaluator id
* @param value
* the queue of splits
* @return a numberedSplit or null if it cannot find one
*/
protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId,
final BlockingQueue<NumberedSplit<InputSplit>> value) {
if (value == null) {
LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
return null;
}
while (true) {
final NumberedSplit<InputSplit> split = value.poll();
if (split == null) {
return null;
}
if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
final NumberedSplit<InputSplit> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
if (old != null) {
throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported");
} else {
LOG.log(Level.FINE, "Returning " + split.getIndex());
return split;
}
}
}
}
}