blob: 3f681ff24bf7ded8c985d199b860afa6c13d7200 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.master.balancer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @deprecated since 2.1.0. Use {@link org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer}
* instead, as it as the same functionality but a stable API.
*/
@Deprecated(since = "2.1.0")
public class DefaultLoadBalancer extends TabletBalancer {
private static final Logger log = LoggerFactory.getLogger(DefaultLoadBalancer.class);
Iterator<TServerInstance> assignments;
// if tableToBalance is set, then only balance the given table
TableId tableToBalance = null;
public DefaultLoadBalancer() {
log.warn(
"{} has been deprecated and will be removed in a future release. Please update your "
+ "configuration to use the equivalent {} instead.",
getClass().getName(), SimpleLoadBalancer.class.getName());
}
public DefaultLoadBalancer(TableId table) {
this(); // emit warning
tableToBalance = table;
}
List<TServerInstance> randomize(Set<TServerInstance> locations) {
List<TServerInstance> result = new ArrayList<>(locations);
Collections.shuffle(result);
return result;
}
public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations,
TServerInstance last) {
if (locations.isEmpty())
return null;
if (last != null) {
// Maintain locality
String fakeSessionID = " ";
TServerInstance simple = new TServerInstance(last.getHostAndPort(), fakeSessionID);
Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
if (find.hasNext()) {
TServerInstance current = find.next();
if (current.getHost().equals(last.getHost()))
return current;
}
}
// The strategy here is to walk through the locations and hand them back, one at a time
// Grab an iterator off of the set of options; use a new iterator if it hands back something not
// in the current list.
if (assignments == null || !assignments.hasNext())
assignments = randomize(locations.keySet()).iterator();
TServerInstance result = assignments.next();
if (!locations.containsKey(result)) {
assignments = null;
return randomize(locations.keySet()).iterator().next();
}
return result;
}
static class ServerCounts implements Comparable<ServerCounts> {
public final TServerInstance server;
public int count;
public final TabletServerStatus status;
ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
this.count = count;
this.server = server;
this.status = status;
}
@Override
public int hashCode() {
return Objects.hashCode(server) + count;
}
@Override
public boolean equals(Object obj) {
return obj == this
|| (obj != null && obj instanceof ServerCounts && compareTo((ServerCounts) obj) == 0);
}
@Override
public int compareTo(ServerCounts obj) {
int result = count - obj.count;
if (result == 0)
return server.compareTo(obj.server);
return result;
}
}
public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current,
List<TabletMigration> result) {
boolean moreBalancingNeeded = false;
try {
// no moves possible
if (current.size() < 2) {
return false;
}
final Map<TableId,Map<KeyExtent,TabletStats>> donerTabletStats = new HashMap<>();
// Sort by total number of online tablets, per server
int total = 0;
ArrayList<ServerCounts> totals = new ArrayList<>();
for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
int serverTotal = 0;
if (entry.getValue() != null && entry.getValue().tableMap != null) {
for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
/**
* The check below was on entry.getKey(), but that resolves to a tabletserver not a
* tablename. Believe it should be e.getKey() which is a tablename
*/
if (tableToBalance == null || tableToBalance.canonical().equals(e.getKey()))
serverTotal += e.getValue().onlineTablets;
}
}
totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
total += serverTotal;
}
// order from low to high
totals.sort(Collections.reverseOrder());
int even = total / totals.size();
int numServersOverEven = total % totals.size();
// Move tablets from the servers with too many to the servers with
// the fewest but only nominate tablets to move once. This allows us
// to fill new servers with tablets from a mostly balanced server
// very quickly. However, it may take several balancing passes to move
// tablets from one hugely overloaded server to many slightly
// under-loaded servers.
int end = totals.size() - 1;
int movedAlready = 0;
int tooManyIndex = 0;
while (tooManyIndex < end) {
ServerCounts tooMany = totals.get(tooManyIndex);
int goal = even;
if (tooManyIndex < numServersOverEven) {
goal++;
}
int needToUnload = tooMany.count - goal;
ServerCounts tooLittle = totals.get(end);
int needToLoad = goal - tooLittle.count - movedAlready;
if (needToUnload < 1 && needToLoad < 1) {
break;
}
if (needToUnload >= needToLoad) {
result.addAll(move(tooMany, tooLittle, needToLoad, donerTabletStats));
end--;
movedAlready = 0;
} else {
result.addAll(move(tooMany, tooLittle, needToUnload, donerTabletStats));
movedAlready += needToUnload;
}
if (needToUnload > needToLoad) {
moreBalancingNeeded = true;
} else {
tooManyIndex++;
donerTabletStats.clear();
}
}
} finally {
log.trace("balance ended with {} migrations", result.size());
}
return moreBalancingNeeded;
}
/**
* Select a tablet based on differences between table loads; if the loads are even, use the
* busiest table
*/
List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count,
Map<TableId,Map<KeyExtent,TabletStats>> donerTabletStats) {
if (count == 0) {
return Collections.emptyList();
}
List<TabletMigration> result = new ArrayList<>();
// Copy counts so we can update them as we propose migrations
Map<TableId,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
Map<TableId,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
for (int i = 0; i < count; i++) {
TableId table;
Integer tooLittleCount;
if (tableToBalance == null) {
// find a table to migrate
// look for an uneven table count
int biggestDifference = 0;
TableId biggestDifferenceTable = null;
for (var tableEntry : tooMuchMap.entrySet()) {
TableId tableID = tableEntry.getKey();
tooLittleMap.putIfAbsent(tableID, 0);
int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
if (diff > biggestDifference) {
biggestDifference = diff;
biggestDifferenceTable = tableID;
}
}
if (biggestDifference < 2) {
table = busiest(tooMuch.status.tableMap);
} else {
table = biggestDifferenceTable;
}
} else {
// just balance the given table
table = tableToBalance;
}
Map<KeyExtent,TabletStats> onlineTabletsForTable = donerTabletStats.get(table);
try {
if (onlineTabletsForTable == null) {
onlineTabletsForTable = new HashMap<>();
List<TabletStats> stats = getOnlineTabletsForTable(tooMuch.server, table);
if (stats == null) {
log.warn("Unable to find tablets to move");
return result;
}
for (TabletStats stat : stats)
onlineTabletsForTable.put(KeyExtent.fromThrift(stat.extent), stat);
donerTabletStats.put(table, onlineTabletsForTable);
}
} catch (Exception ex) {
log.error("Unable to select a tablet to move", ex);
return result;
}
KeyExtent extent = selectTablet(onlineTabletsForTable);
onlineTabletsForTable.remove(extent);
if (extent == null)
return result;
tooMuchMap.put(table, tooMuchMap.get(table) - 1);
/**
* If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there
* is only one tabletserver that holds all of the tablets. Here we check to see if in fact
* that is the case and if so set the value to 0.
*/
tooLittleCount = tooLittleMap.get(table);
if (tooLittleCount == null) {
tooLittleCount = 0;
}
tooLittleMap.put(table, tooLittleCount + 1);
tooMuch.count--;
tooLittle.count++;
result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
}
return result;
}
static Map<TableId,Integer> tabletCountsPerTable(TabletServerStatus status) {
Map<TableId,Integer> result = new HashMap<>();
if (status != null && status.tableMap != null) {
Map<String,TableInfo> tableMap = status.tableMap;
for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
result.put(TableId.of(entry.getKey()), entry.getValue().onlineTablets);
}
}
return result;
}
static KeyExtent selectTablet(Map<KeyExtent,TabletStats> extents) {
if (extents.isEmpty())
return null;
KeyExtent mostRecentlySplit = null;
long splitTime = 0;
for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
if (entry.getValue().splitCreationTime >= splitTime) {
splitTime = entry.getValue().splitCreationTime;
mostRecentlySplit = entry.getKey();
}
return mostRecentlySplit;
}
// define what it means for a tablet to be busy
private static TableId busiest(Map<String,TableInfo> tables) {
TableId result = null;
double busiest = Double.NEGATIVE_INFINITY;
for (Entry<String,TableInfo> entry : tables.entrySet()) {
TableInfo info = entry.getValue();
double busy = info.ingestRate + info.queryRate;
if (busy > busiest) {
busiest = busy;
result = TableId.of(entry.getKey());
}
}
return result;
}
@Override
public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current,
Map<KeyExtent,TServerInstance> unassigned, Map<KeyExtent,TServerInstance> assignments) {
for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
assignments.put(entry.getKey(), getAssignment(current, entry.getValue()));
}
}
private static final NoTservers NO_SERVERS = new NoTservers(log);
protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
@Override
public long balance(SortedMap<TServerInstance,TabletServerStatus> current,
Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
// do we have any servers?
if (current.isEmpty()) {
constraintNotMet(NO_SERVERS);
} else {
// Don't migrate if we have migrations in progress
if (migrations.isEmpty()) {
resetBalancerErrors();
if (getMigrations(current, migrationsOut))
return 1 * 1000;
} else {
outstandingMigrations.migrations = migrations;
constraintNotMet(outstandingMigrations);
}
}
return 5 * 1000;
}
}