blob: f22693a5ccf362ebdded06ef3356b8b664a87c16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.indexing;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Random;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class IndexingScheduler {
private static final Logger LOGGER = LogManager.getLogger();
/** a list of NCs */
private String[] NCs;
/** a map from ip to NCs */
private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw HyracksException.create(e);
}
}
/**
* Set location constraints for a file scan operator with a list of file
* splits. It tries to assign splits to their local machines fairly
* Locality is more important than fairness
*
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
if (splits == null) {
/* deal the case when the splits array is null */
return new String[] {};
}
int[] workloads = new int[NCs.length];
Arrays.fill(workloads, 0);
String[] locations = new String[splits.length];
Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
/*
* upper bound is number of splits
*/
int upperBoundSlots = splits.length;
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.length];
Arrays.fill(scheduled, false);
/*
* scan the splits and build the popularity map
* give the machines with less local splits more scheduling priority
*/
buildPopularityMap(splits, locationToNumOfSplits);
HashMap<String, Integer> locationToNumOfAssignement = new HashMap<String, Integer>();
for (String location : locationToNumOfSplits.keySet()) {
locationToNumOfAssignement.put(location, 0);
}
/*
* push data-local upper-bounds slots to each machine
*/
scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits,
locationToNumOfAssignement);
int dataLocalCount = 0;
for (int i = 0; i < scheduled.length; i++) {
if (scheduled[i] == true) {
dataLocalCount++;
}
}
LOGGER.info("Data local rate: "
+ (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
/*
* push non-data-local upper-bounds slots to each machine
*/
locationToNumOfAssignement.clear();
for (String nc : NCs) {
locationToNumOfAssignement.put(nc, 0);
}
for (int i = 0; i < scheduled.length; i++) {
if (scheduled[i]) {
locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i]) + 1);
}
}
scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled, locationToNumOfAssignement);
return locations;
} catch (IOException e) {
throw HyracksException.create(e);
}
}
/**
* Schedule non-local slots to each machine
*
* @param splits
* The HDFS file splits.
* @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
* @param slotLimit
* The maximum slots of each machine.
* @param scheduled
* Indicate which slot is scheduled.
* @param locationToNumOfAssignement
*/
private void scheduleNonLocalSlots(InputSplit[] splits, final int[] workloads, String[] locations, int slotLimit,
boolean[] scheduled, final HashMap<String, Integer> locationToNumOfAssignement)
throws IOException, UnknownHostException {
PriorityQueue<String> scheduleCadndiates =
new PriorityQueue<String>(NCs.length, Comparator.comparing(locationToNumOfAssignement::get));
scheduleCadndiates.addAll(Arrays.asList(NCs));
/*
* schedule no-local file reads
*/
for (int i = 0; i < splits.length; i++) {
/* if there is no data-local NC choice, choose a random one */
if (!scheduled[i]) {
String selectedNcName = scheduleCadndiates.remove();
if (selectedNcName != null) {
int ncIndex = ncNameToIndex.get(selectedNcName);
workloads[ncIndex]++;
scheduled[i] = true;
locations[i] = selectedNcName;
locationToNumOfAssignement.put(selectedNcName, workloads[ncIndex]);
scheduleCadndiates.add(selectedNcName);
}
}
}
}
/**
* Schedule data-local slots to each machine.
*
* @param splits
* The HDFS file splits.
* @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
* @param slots
* The maximum slots of each machine.
* @param random
* The random generator.
* @param scheduled
* Indicate which slot is scheduled.
* @throws IOException
* @throws UnknownHostException
*/
private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits,
final HashMap<String, Integer> locationToNumOfAssignement) throws IOException, UnknownHostException {
/* scheduling candidates will be ordered inversely according to their popularity */
PriorityQueue<String> scheduleCadndiates = new PriorityQueue<>(3, (s1, s2) -> {
int assignmentDifference = locationToNumOfAssignement.get(s1).compareTo(locationToNumOfAssignement.get(s2));
if (assignmentDifference != 0) {
return assignmentDifference;
}
return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
});
for (int i = 0; i < splits.length; i++) {
if (scheduled[i]) {
continue;
}
/*
* get the location of all the splits
*/
String[] locs = splits[i].getLocations();
if (locs.length > 0) {
scheduleCadndiates.clear();
Collections.addAll(scheduleCadndiates, locs);
for (String candidate : scheduleCadndiates) {
/*
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(candidate);
/*
* iterate overa all ips
*/
for (InetAddress ip : allIps) {
/*
* if the node controller exists
*/
if (ipToNcMapping.get(ip.getHostAddress()) != null) {
/*
* set the ncs
*/
List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
int arrayPos = random.nextInt(dataLocations.size());
String nc = dataLocations.get(arrayPos);
int pos = ncNameToIndex.get(nc);
/*
* check if the node is already full
*/
if (workloads[pos] < slots) {
locations[i] = nc;
workloads[pos]++;
scheduled[i] = true;
locationToNumOfAssignement.put(candidate,
locationToNumOfAssignement.get(candidate) + 1);
break;
}
}
}
/*
* break the loop for data-locations if the schedule has
* already been found
*/
if (scheduled[i]) {
break;
}
}
}
}
}
/**
* Scan the splits once and build a popularity map
*
* @param splits
* the split array
* @param locationToNumOfSplits
* the map to be built
* @throws IOException
*/
private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
throws IOException {
for (InputSplit split : splits) {
String[] locations = split.getLocations();
for (String loc : locations) {
IntWritable locCount = locationToNumOfSplits.get(loc);
if (locCount == null) {
locCount = new IntWritable(0);
locationToNumOfSplits.put(loc, locCount);
}
locCount.set(locCount.get() + 1);
}
}
}
/**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping.clear();
ncNameToIndex.clear();
int i = 0;
/*
* build the IP address to NC map
*/
for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
.getHostAddress();
List<String> matchedNCs = ipToNcMapping.computeIfAbsent(ipAddr, k -> new ArrayList<>());
matchedNCs.add(entry.getKey());
NCs[i] = entry.getKey();
i++;
}
/*
* set up the NC name to index mapping
*/
for (i = 0; i < NCs.length; i++) {
ncNameToIndex.put(NCs[i], i);
}
} catch (Exception e) {
throw HyracksException.create(e);
}
}
}