blob: 3445d68be3c765412004be59a5a07f202343c3f0 [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.hdfs2.scheduler;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.mapreduce.InputSplit;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
/**
* The scheduler conduct data-local scheduling for data reading on HDFS.
* This class works for Hadoop new API.
*/
public class Scheduler {
/** a list of NCs */
private String[] NCs;
/** a map from ip to NCs */
private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
/**
* The constructor of the scheduler
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
}
}
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
loadIPAddressToNCMap(ncNameToNcInfos);
}
/**
* Set location constraints for a file scan operator with a list of file splits
*
* @throws HyracksDataException
*/
public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
int[] capacity = new int[NCs.length];
Arrays.fill(capacity, 0);
String[] locations = new String[splits.size()];
int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
/ capacity.length + 1);
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.size()];
Arrays.fill(scheduled, false);
for (int i = 0; i < splits.size(); i++) {
/**
* get the location of all the splits
*/
String[] loc = splits.get(i).getLocations();
if (loc.length > 0) {
for (int j = 0; j < loc.length; j++) {
/**
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
/**
* iterate overa all ips
*/
for (InetAddress ip : allIps) {
/**
* if the node controller exists
*/
if (ipToNcMapping.get(ip.getHostAddress()) != null) {
/**
* set the ncs
*/
List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
int arrayPos = random.nextInt(dataLocations.size());
String nc = dataLocations.get(arrayPos);
int pos = ncNameToIndex.get(nc);
/**
* check if the node is already full
*/
if (capacity[pos] < slots) {
locations[i] = nc;
capacity[pos]++;
scheduled[i] = true;
}
}
}
/**
* break the loop for data-locations if the schedule has already been found
*/
if (scheduled[i] == true) {
break;
}
}
}
}
/**
* find the lowest index the current available NCs
*/
int currentAvailableNC = 0;
for (int i = 0; i < capacity.length; i++) {
if (capacity[i] < slots) {
currentAvailableNC = i;
break;
}
}
/**
* schedule no-local file reads
*/
for (int i = 0; i < splits.size(); i++) {
// if there is no data-local NC choice, choose a random one
if (!scheduled[i]) {
locations[i] = NCs[currentAvailableNC];
capacity[currentAvailableNC]++;
scheduled[i] = true;
/**
* move the available NC cursor to the next one
*/
for (int j = currentAvailableNC; j < capacity.length; j++) {
if (capacity[j] < slots) {
currentAvailableNC = j;
break;
}
}
}
}
return locations;
} catch (Exception e) {
throw new HyracksException(e);
}
}
/**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
NCs = new String[ncNameToNcInfos.size()];
int i = 0;
/**
* build the IP address to NC map
*/
for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
.getHostAddress();
List<String> matchedNCs = ipToNcMapping.get(ipAddr);
if (matchedNCs == null) {
matchedNCs = new ArrayList<String>();
ipToNcMapping.put(ipAddr, matchedNCs);
}
matchedNCs.add(entry.getKey());
NCs[i] = entry.getKey();
i++;
}
/**
* set up the NC name to index mapping
*/
for (i = 0; i < NCs.length; i++) {
ncNameToIndex.put(NCs[i], i);
}
} catch (Exception e) {
throw new HyracksException(e);
}
}
}