blob: 3f7997b8401d235efc8a894e37d41ab009311c14 [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.hdfs.scheduler;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.logging.Logger;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.api.INcCollection;
import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
* The scheduler conduct data-local scheduling for data reading on HDFS. This
* class works for Hadoop old API.
*/
@SuppressWarnings("deprecation")
public class Scheduler {
private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
/** a list of NCs */
private String[] NCs;
/** a map from ip to NCs */
private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
/** a map from NC name to the NodeControllerInfo */
private Map<String, NodeControllerInfo> ncNameToNcInfos;
/**
* the nc collection builder
*/
private INcCollectionBuilder ncCollectionBuilder;
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
this.ncNameToNcInfos = hcc.getNodeControllerInfos();
ClusterTopology topology = hcc.getClusterTopology();
this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
: new RackAwareNcCollectionBuilder(topology);
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
}
}
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
this.ncNameToNcInfos = hcc.getNodeControllerInfos();
this.ncCollectionBuilder = ncCollectionBuilder;
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
}
}
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
this.ncNameToNcInfos = ncNameToNcInfos;
this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
loadIPAddressToNCMap(ncNameToNcInfos);
}
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* the mapping from nc names to nc infos
* @param topology
* the hyracks cluster toplogy
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
this(ncNameToNcInfos);
this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
: new RackAwareNcCollectionBuilder(topology);
}
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
throws HyracksException {
this.ncNameToNcInfos = ncNameToNcInfos;
this.ncCollectionBuilder = ncCollectionBuilder;
loadIPAddressToNCMap(ncNameToNcInfos);
}
/**
* Set location constraints for a file scan operator with a list of file
* splits. It guarantees the maximum slots a machine can is at most one more
* than the minimum slots a machine can get.
*
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
if (splits == null) {
/** deal the case when the splits array is null */
return new String[] {};
}
int[] workloads = new int[NCs.length];
Arrays.fill(workloads, 0);
String[] locations = new String[splits.length];
Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
/**
* upper bound number of slots that a machine can get
*/
int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
: (splits.length / workloads.length + 1);
/**
* lower bound number of slots that a machine can get
*/
int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.length];
Arrays.fill(scheduled, false);
/**
* scan the splits and build the popularity map
* give the machines with less local splits more scheduling priority
*/
buildPopularityMap(splits, locationToNumOfSplits);
/**
* push data-local lower-bounds slots to each machine
*/
scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
/**
* push data-local upper-bounds slots to each machine
*/
scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
int dataLocalCount = 0;
for (int i = 0; i < scheduled.length; i++) {
if (scheduled[i] == true) {
dataLocalCount++;
}
}
LOGGER.info("Data local rate: "
+ (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
/**
* push non-data-local lower-bounds slots to each machine
*/
scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
/**
* push non-data-local upper-bounds slots to each machine
*/
scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
return locations;
} catch (IOException e) {
throw new HyracksException(e);
}
}
/**
* Schedule non-local slots to each machine
*
* @param splits
* The HDFS file splits.
* @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
* @param slotLimit
* The maximum slots of each machine.
* @param scheduled
* Indicate which slot is scheduled.
*/
private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
boolean[] scheduled) throws IOException, UnknownHostException {
/**
* build the map from available ips to the number of available slots
*/
INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
workloads, slotLimit);
if (ncCollection.numAvailableSlots() == 0) {
return;
}
/**
* schedule no-local file reads
*/
for (int i = 0; i < splits.length; i++) {
/** if there is no data-local NC choice, choose a random one */
if (!scheduled[i]) {
InputSplit split = splits[i];
String selectedNcName = ncCollection.findNearestAvailableSlot(split);
if (selectedNcName != null) {
int ncIndex = ncNameToIndex.get(selectedNcName);
workloads[ncIndex]++;
scheduled[i] = true;
locations[i] = selectedNcName;
}
}
}
}
/**
* Schedule data-local slots to each machine.
*
* @param splits
* The HDFS file splits.
* @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
* @param slots
* The maximum slots of each machine.
* @param random
* The random generator.
* @param scheduled
* Indicate which slot is scheduled.
* @throws IOException
* @throws UnknownHostException
*/
private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
UnknownHostException {
/** scheduling candidates will be ordered inversely according to their popularity */
PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
}
});
for (int i = 0; i < splits.length; i++) {
if (scheduled[i]) {
continue;
}
/**
* get the location of all the splits
*/
String[] locs = splits[i].getLocations();
if (locs.length > 0) {
scheduleCadndiates.clear();
for (int j = 0; j < locs.length; j++) {
scheduleCadndiates.add(locs[j]);
}
for (String candidate : scheduleCadndiates) {
/**
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(candidate);
/**
* iterate overa all ips
*/
for (InetAddress ip : allIps) {
/**
* if the node controller exists
*/
if (ipToNcMapping.get(ip.getHostAddress()) != null) {
/**
* set the ncs
*/
List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
int arrayPos = random.nextInt(dataLocations.size());
String nc = dataLocations.get(arrayPos);
int pos = ncNameToIndex.get(nc);
/**
* check if the node is already full
*/
if (workloads[pos] < slots) {
locations[i] = nc;
workloads[pos]++;
scheduled[i] = true;
break;
}
}
}
/**
* break the loop for data-locations if the schedule has
* already been found
*/
if (scheduled[i] == true) {
break;
}
}
}
}
}
/**
* Scan the splits once and build a popularity map
*
* @param splits
* the split array
* @param locationToNumOfSplits
* the map to be built
* @throws IOException
*/
private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
throws IOException {
for (InputSplit split : splits) {
String[] locations = split.getLocations();
for (String loc : locations) {
IntWritable locCount = locationToNumOfSplits.get(loc);
if (locCount == null) {
locCount = new IntWritable(0);
locationToNumOfSplits.put(loc, locCount);
}
locCount.set(locCount.get() + 1);
}
}
}
/**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping.clear();
ncNameToIndex.clear();
int i = 0;
/**
* build the IP address to NC map
*/
for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
.getHostAddress();
List<String> matchedNCs = ipToNcMapping.get(ipAddr);
if (matchedNCs == null) {
matchedNCs = new ArrayList<String>();
ipToNcMapping.put(ipAddr, matchedNCs);
}
matchedNCs.add(entry.getKey());
NCs[i] = entry.getKey();
i++;
}
/**
* set up the NC name to index mapping
*/
for (i = 0; i < NCs.length; i++) {
ncNameToIndex.put(NCs[i], i);
}
} catch (Exception e) {
throw new HyracksException(e);
}
}
}