blob: 0d8f3cff210208fb848dab585a6bf83bfd84690c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.partition;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import com.google.common.base.Objects;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
/**
* Helper class for balancing partitions across a set of workers.
*/
public class PartitionBalancer {
/** Partition balancing algorithm */
public static final String PARTITION_BALANCE_ALGORITHM =
"hash.partitionBalanceAlgorithm";
/** No rebalancing during the supersteps */
public static final String STATIC_BALANCE_ALGORITHM =
"static";
/** Rebalance across supersteps by edges */
public static final String EGDE_BALANCE_ALGORITHM =
"edges";
/** Rebalance across supersteps by vertices */
public static final String VERTICES_BALANCE_ALGORITHM =
"vertices";
/** Class logger */
private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
/**
* What value to balance partitions with? Edges, vertices?
*/
private enum BalanceValue {
/** Not chosen */
UNSET,
/** Balance with edges */
EDGES,
/** Balance with vertices */
VERTICES
}
/**
* Do not construct this class.
*/
private PartitionBalancer() { }
/**
* Get the value used to balance.
*
* @param partitionStat Stats of this partition.
* @param balanceValue Type of the value to balance.
* @return Balance value.
*/
private static long getBalanceValue(PartitionStats partitionStat,
BalanceValue balanceValue) {
switch (balanceValue) {
case EDGES:
return partitionStat.getEdgeCount();
case VERTICES:
return partitionStat.getVertexCount();
default:
throw new IllegalArgumentException(
"getBalanceValue: Illegal balance value " + balanceValue);
}
}
/**
* Used to sort the partition owners from lowest value to highest value
*/
private static class PartitionOwnerComparator implements
Comparator<PartitionOwner> {
/** Map of owner to stats */
private final Map<PartitionOwner, PartitionStats> ownerStatMap;
/** Value type to compare on */
private final BalanceValue balanceValue;
/**
* Only constructor.
*
* @param ownerStatMap Map of owners to stats.
* @param balanceValue Value to balance with.
*/
public PartitionOwnerComparator(
Map<PartitionOwner, PartitionStats> ownerStatMap,
BalanceValue balanceValue) {
this.ownerStatMap = ownerStatMap;
this.balanceValue = balanceValue;
}
@Override
public int compare(PartitionOwner owner1, PartitionOwner owner2) {
return (int)
(getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
getBalanceValue(ownerStatMap.get(owner2), balanceValue));
}
}
/**
* Structure to keep track of how much value a {@link WorkerInfo} has
* been assigned.
*/
private static class WorkerInfoAssignments implements
Comparable<WorkerInfoAssignments> {
/** Worker info associated */
private final WorkerInfo workerInfo;
/** Balance value */
private final BalanceValue balanceValue;
/** Map of owner to stats */
private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
/** Current value of this object */
private long value = 0;
/**
* Constructor with final values.
*
* @param workerInfo Worker info for assignment.
* @param balanceValue Value used to balance.
* @param ownerStatsMap Map of owner to stats.
*/
public WorkerInfoAssignments(
WorkerInfo workerInfo,
BalanceValue balanceValue,
Map<PartitionOwner, PartitionStats> ownerStatsMap) {
this.workerInfo = workerInfo;
this.balanceValue = balanceValue;
this.ownerStatsMap = ownerStatsMap;
}
/**
* Get the total value of all partitions assigned to this worker.
*
* @return Total value of all partition assignments.
*/
public long getValue() {
return value;
}
/**
* Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
*
* @param partitionOwner PartitionOwner to assign.
*/
public void assignPartitionOwner(
PartitionOwner partitionOwner) {
value += getBalanceValue(ownerStatsMap.get(partitionOwner),
balanceValue);
if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
partitionOwner.setPreviousWorkerInfo(
partitionOwner.getWorkerInfo());
partitionOwner.setWorkerInfo(workerInfo);
} else {
partitionOwner.setPreviousWorkerInfo(null);
}
}
@Override
public int compareTo(WorkerInfoAssignments other) {
return (int)
(getValue() - ((WorkerInfoAssignments) other).getValue());
}
@Override
public boolean equals(Object obj) {
return obj instanceof WorkerInfoAssignments &&
compareTo((WorkerInfoAssignments) obj) == 0;
}
@Override
public int hashCode() {
return Objects.hashCode(value);
}
}
/**
* Balance the partitions with an algorithm based on a value.
*
* @param conf Configuration to find the algorithm
* @param partitionOwners All the owners of all partitions
* @param allPartitionStats All the partition stats
* @param availableWorkerInfos All the available workers
* @return Balanced partition owners
*/
public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
Configuration conf,
Collection<PartitionOwner> partitionOwners,
Collection<PartitionStats> allPartitionStats,
Collection<WorkerInfo> availableWorkerInfos) {
String balanceAlgorithm =
conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
if (LOG.isInfoEnabled()) {
LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
balanceAlgorithm);
}
BalanceValue balanceValue = BalanceValue.UNSET;
if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
return partitionOwners;
} else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
balanceValue = BalanceValue.EDGES;
} else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
balanceValue = BalanceValue.VERTICES;
} else {
throw new IllegalArgumentException(
"balancePartitionsAcrossWorkers: Illegal balance " +
"algorithm - " + balanceAlgorithm);
}
// Join the partition stats and partition owners by partition id
Map<Integer, PartitionStats> idStatMap =
new HashMap<Integer, PartitionStats>();
for (PartitionStats partitionStats : allPartitionStats) {
if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
null) {
throw new IllegalStateException(
"balancePartitionsAcrossWorkers: Duplicate partition id " +
"for " + partitionStats);
}
}
Map<PartitionOwner, PartitionStats> ownerStatsMap =
new HashMap<PartitionOwner, PartitionStats>();
for (PartitionOwner partitionOwner : partitionOwners) {
PartitionStats partitionStats =
idStatMap.get(partitionOwner.getPartitionId());
if (partitionStats == null) {
throw new IllegalStateException(
"balancePartitionsAcrossWorkers: Missing partition " +
"stats for " + partitionOwner);
}
if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
throw new IllegalStateException(
"balancePartitionsAcrossWorkers: Duplicate partition " +
"owner " + partitionOwner);
}
}
if (ownerStatsMap.size() != partitionOwners.size()) {
throw new IllegalStateException(
"balancePartitionsAcrossWorkers: ownerStats count = " +
ownerStatsMap.size() + ", partitionOwners count = " +
partitionOwners.size() + " and should match.");
}
List<WorkerInfoAssignments> workerInfoAssignmentsList =
new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
for (WorkerInfo workerInfo : availableWorkerInfos) {
workerInfoAssignmentsList.add(
new WorkerInfoAssignments(
workerInfo, balanceValue, ownerStatsMap));
}
// A simple heuristic for balancing the partitions across the workers
// using a value (edges, vertices). An improvement would be to
// take into account the already existing partition worker assignments.
// 1. Sort the partitions by size
// 2. Place the workers in a min heap sorted by their total balance
// value.
// 3. From largest partition to the smallest, take the partition
// worker at the top of the heap, add the partition to it, and
// then put it back in the heap
List<PartitionOwner> partitionOwnerList =
new ArrayList<PartitionOwner>(partitionOwners);
Collections.sort(partitionOwnerList,
Collections.reverseOrder(
new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
PriorityQueue<WorkerInfoAssignments> minQueue =
new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
for (PartitionOwner partitionOwner : partitionOwnerList) {
WorkerInfoAssignments chosenWorker = minQueue.remove();
chosenWorker.assignPartitionOwner(partitionOwner);
minQueue.add(chosenWorker);
}
return partitionOwnerList;
}
/**
* Helper function to update partition owners and determine which
* partitions need to be sent from a specific worker.
*
* @param partitionOwnerList Local {@link PartitionOwner} list for the
* given worker
* @param myWorkerInfo Worker info
* @param masterSetPartitionOwners Master set partition owners, received
* prior to beginning the superstep
* @return Information for the partition exchange.
*/
public static PartitionExchange updatePartitionOwners(
List<PartitionOwner> partitionOwnerList,
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners) {
partitionOwnerList.clear();
partitionOwnerList.addAll(masterSetPartitionOwners);
Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
new HashMap<WorkerInfo, List<Integer>>();
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
if (partitionOwner.getPreviousWorkerInfo() == null) {
continue;
} else if (partitionOwner.getWorkerInfo().equals(
myWorkerInfo) &&
partitionOwner.getPreviousWorkerInfo().equals(
myWorkerInfo)) {
throw new IllegalStateException(
"updatePartitionOwners: Impossible to have the same " +
"previous and current worker info " + partitionOwner +
" as me " + myWorkerInfo);
} else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
} else if (partitionOwner.getPreviousWorkerInfo().equals(
myWorkerInfo)) {
if (workerPartitionOwnerMap.containsKey(
partitionOwner.getWorkerInfo())) {
workerPartitionOwnerMap.get(
partitionOwner.getWorkerInfo()).add(
partitionOwner.getPartitionId());
} else {
List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
tmpPartitionOwnerList);
}
}
}
return new PartitionExchange(dependentWorkerSet,
workerPartitionOwnerMap);
}
}