blob: 2b9e89968c404d4aa6175fd9ce214df48ae442a8 [file] [log] [blame]
package edu.uci.ics.hyracks.hdfs.scheduler;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.logging.Logger;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.api.INcCollection;
import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
@SuppressWarnings("deprecation")
public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
private ClusterTopology topology;
public RackAwareNcCollectionBuilder(ClusterTopology topology) {
this.topology = topology;
}
@Override
public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
final int[] workloads, final int slotLimit) {
try {
final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
for (int i = 0; i < NCs.length; i++) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
List<String> ncs = pathToNCs.get(path);
if (ncs == null) {
ncs = new ArrayList<String>();
pathToNCs.put(path, ncs);
}
ncs.add(NCs[i]);
}
final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
new Comparator<List<Integer>>() {
@Override
public int compare(List<Integer> l1, List<Integer> l2) {
int commonLength = Math.min(l1.size(), l2.size());
for (int i = 0; i < commonLength; i++) {
Integer value1 = l1.get(i);
Integer value2 = l2.get(i);
int cmp = value1 > value2 ? 1 : (value1 < value2 ? -1 : 0);
if (cmp != 0) {
return cmp;
}
}
return l1.size() > l2.size() ? 1 : (l1.size() < l2.size() ? -1 : 0);
}
});
for (int i = 0; i < workloads.length; i++) {
if (workloads[i] < slotLimit) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
IntWritable availableSlot = availableIpsToSlots.get(path);
if (availableSlot == null) {
availableSlot = new IntWritable(slotLimit - workloads[i]);
availableIpsToSlots.put(path, availableSlot);
} else {
availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
}
}
}
return new INcCollection() {
@Override
public String findNearestAvailableSlot(InputSplit split) {
try {
String[] locs = split.getLocations();
int minDistance = Integer.MAX_VALUE;
List<Integer> currentCandidatePath = null;
if (locs == null || locs.length > 0) {
for (int j = 0; j < locs.length; j++) {
/**
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
boolean inTopology = false;
for (InetAddress ip : allIps) {
List<Integer> splitPath = new ArrayList<Integer>();
boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
if (!inCluster) {
continue;
}
inTopology = true;
/**
* if the node controller exists
*/
List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
if (candidatePath == null) {
candidatePath = availableIpsToSlots.ceilingKey(splitPath);
}
if (candidatePath != null) {
if (availableIpsToSlots.get(candidatePath).get() > 0) {
int distance = distance(splitPath, candidatePath);
if (minDistance > distance) {
minDistance = distance;
currentCandidatePath = candidatePath;
}
}
}
}
if (!inTopology) {
LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
/**
* if the machine is not in the toplogy file
*/
List<Integer> candidatePath = null;
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
if (entry.getValue().get() > 0) {
candidatePath = entry.getKey();
break;
}
}
/** the split path is empty */
if (candidatePath != null) {
if (availableIpsToSlots.get(candidatePath).get() > 0) {
currentCandidatePath = candidatePath;
}
}
}
}
} else {
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
if (entry.getValue().get() > 0) {
currentCandidatePath = entry.getKey();
break;
}
}
}
if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
/**
* Update the entry of the selected IP
*/
IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
availableSlot.set(availableSlot.get() - 1);
if (availableSlot.get() == 0) {
availableIpsToSlots.remove(currentCandidatePath);
}
/**
* Update the entry of the selected NC
*/
List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
for (String candidate : candidateNcs) {
int ncIndex = ncNameToIndex.get(candidate);
if (workloads[ncIndex] < slotLimit) {
return candidate;
}
}
}
/** not scheduled */
return null;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public int numAvailableSlots() {
return availableIpsToSlots.size();
}
private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
int commonLength = Math.min(splitPath.size(), candidatePath.size());
int distance = 0;
for (int i = 0; i < commonLength; i++) {
distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
}
List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
for (int i = commonLength; i < restElements.size(); i++) {
distance = distance * 100 + Math.abs(restElements.get(i));
}
return distance;
}
};
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}