blob: 320b48b5e36df3acbbdfca0d0f525df7584b1236 [file] [log] [blame]
package edu.uci.ics.hyracks.hdfs.scheduler;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.hdfs.api.INcCollection;
import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
@SuppressWarnings("deprecation")
public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
@Override
public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
final int[] workloads, final int slotLimit) {
final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
for (int i = 0; i < workloads.length; i++) {
if (workloads[i] < slotLimit) {
BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
IntWritable availableSlot = availableIpsToSlots.get(ip);
if (availableSlot == null) {
availableSlot = new IntWritable(slotLimit - workloads[i]);
availableIpsToSlots.put(ip, availableSlot);
} else {
availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
}
}
}
return new INcCollection() {
@Override
public String findNearestAvailableSlot(InputSplit split) {
try {
String[] locs = split.getLocations();
int minDistance = Integer.MAX_VALUE;
BytesWritable currentCandidateIp = null;
if (locs == null || locs.length > 0) {
for (int j = 0; j < locs.length; j++) {
/**
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
for (InetAddress ip : allIps) {
BytesWritable splitIp = new BytesWritable(ip.getAddress());
/**
* if the node controller exists
*/
BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
if (candidateNcIp == null) {
candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
}
if (candidateNcIp != null) {
if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
byte[] candidateIP = candidateNcIp.getBytes();
byte[] splitIP = splitIp.getBytes();
int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
| (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
| (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
int distance = Math.abs(candidateInt - splitInt);
if (minDistance > distance) {
minDistance = distance;
currentCandidateIp = candidateNcIp;
}
}
}
}
}
} else {
for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
if (entry.getValue().get() > 0) {
currentCandidateIp = entry.getKey();
break;
}
}
}
if (currentCandidateIp != null) {
/**
* Update the entry of the selected IP
*/
IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
availableSlot.set(availableSlot.get() - 1);
if (availableSlot.get() == 0) {
availableIpsToSlots.remove(currentCandidateIp);
}
/**
* Update the entry of the selected NC
*/
List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
currentCandidateIp.getBytes()).getHostAddress());
for (String nc : dataLocations) {
int ncIndex = ncNameToIndex.get(nc);
if (workloads[ncIndex] < slotLimit) {
return nc;
}
}
}
/** not scheduled */
return null;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public int numAvailableSlots() {
return availableIpsToSlots.size();
}
};
}
}