| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.core.spi.balancer; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.data.TabletId; |
| import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; |
| import org.apache.accumulo.core.spi.balancer.data.TServerStatus; |
| import org.apache.accumulo.core.spi.balancer.data.TableStatistics; |
| import org.apache.accumulo.core.spi.balancer.data.TabletMigration; |
| import org.apache.accumulo.core.spi.balancer.data.TabletServerId; |
| import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; |
| import org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter; |
| import org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter.OutstandingMigrationsProblem; |
| import org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter.Problem; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A simple tablet balancer that attempts to spread tablets out evenly across all available tablet |
| * servers. The goal is to achieve the same number of tablets on each tablet server. |
| * |
| * <p> |
| * Initial assignments attempt to maintain locality by assigning tablets to their previous location |
| * if possible, but otherwise assignments are made in a random fashion across all available tablet |
| * servers. |
| * |
| * @since 2.1.0 |
| */ |
| public class SimpleLoadBalancer implements TabletBalancer { |
| |
| private static final Logger log = LoggerFactory.getLogger(SimpleLoadBalancer.class); |
| |
| protected BalancerEnvironment environment; |
| |
| Iterator<TabletServerId> assignments; |
| // if tableToBalance is set, then only balance the given table |
| TableId tableToBalance = null; |
| |
| public SimpleLoadBalancer() {} |
| |
| public SimpleLoadBalancer(TableId table) { |
| tableToBalance = table; |
| } |
| |
| @Override |
| public void init(BalancerEnvironment balancerEnvironment) { |
| this.environment = balancerEnvironment; |
| } |
| |
| List<TabletServerId> randomize(Set<TabletServerId> locations) { |
| List<TabletServerId> result = new ArrayList<>(locations); |
| Collections.shuffle(result); |
| return result; |
| } |
| |
| public TabletServerId getAssignment(SortedMap<TabletServerId,TServerStatus> locations, |
| TabletServerId last) { |
| if (locations.isEmpty()) |
| return null; |
| |
| if (last != null) { |
| // Maintain locality |
| String fakeSessionID = " "; |
| TabletServerId simple = new TabletServerIdImpl(last.getHost(), last.getPort(), fakeSessionID); |
| Iterator<TabletServerId> find = locations.tailMap(simple).keySet().iterator(); |
| if (find.hasNext()) { |
| TabletServerId current = find.next(); |
| if (current.getHost().equals(last.getHost())) |
| return current; |
| } |
| } |
| |
| // The strategy here is to walk through the locations and hand them back, one at a time |
| // Grab an iterator off of the set of options; use a new iterator if it hands back something not |
| // in the current list. |
| if (assignments == null || !assignments.hasNext()) |
| assignments = randomize(locations.keySet()).iterator(); |
| TabletServerId result = assignments.next(); |
| if (!locations.containsKey(result)) { |
| assignments = null; |
| return randomize(locations.keySet()).iterator().next(); |
| } |
| return result; |
| } |
| |
| static class ServerCounts implements Comparable<ServerCounts> { |
| public final TabletServerId server; |
| public int count; |
| public final TServerStatus status; |
| |
| ServerCounts(int count, TabletServerId server, TServerStatus status) { |
| this.count = count; |
| this.server = server; |
| this.status = status; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(server) + count; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return obj == this || (obj instanceof ServerCounts && compareTo((ServerCounts) obj) == 0); |
| } |
| |
| @Override |
| public int compareTo(ServerCounts obj) { |
| int result = count - obj.count; |
| if (result == 0) |
| return server.compareTo(obj.server); |
| return result; |
| } |
| } |
| |
| public boolean getMigrations(Map<TabletServerId,TServerStatus> current, |
| List<TabletMigration> result) { |
| boolean moreBalancingNeeded = false; |
| try { |
| // no moves possible |
| if (current.size() < 2) { |
| return false; |
| } |
| final Map<TableId,Map<TabletId,TabletStatistics>> donerTabletStats = new HashMap<>(); |
| |
| // Sort by total number of online tablets, per server |
| int total = 0; |
| ArrayList<ServerCounts> totals = new ArrayList<>(); |
| for (Entry<TabletServerId,TServerStatus> entry : current.entrySet()) { |
| int serverTotal = 0; |
| if (entry.getValue() != null && entry.getValue().getTableMap() != null) { |
| for (Entry<String,TableStatistics> e : entry.getValue().getTableMap().entrySet()) { |
| /* |
| * The check below was on entry.getKey(), but that resolves to a tabletserver not a |
| * tablename. Believe it should be e.getKey() which is a tablename |
| */ |
| if (tableToBalance == null || tableToBalance.canonical().equals(e.getKey())) |
| serverTotal += e.getValue().getOnlineTabletCount(); |
| } |
| } |
| totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue())); |
| total += serverTotal; |
| } |
| |
| // order from low to high |
| totals.sort(Collections.reverseOrder()); |
| int even = total / totals.size(); |
| int numServersOverEven = total % totals.size(); |
| |
| // Move tablets from the servers with too many to the servers with |
| // the fewest but only nominate tablets to move once. This allows us |
| // to fill new servers with tablets from a mostly balanced server |
| // very quickly. However, it may take several balancing passes to move |
| // tablets from one hugely overloaded server to many slightly |
| // under-loaded servers. |
| int end = totals.size() - 1; |
| int movedAlready = 0; |
| int tooManyIndex = 0; |
| while (tooManyIndex < end) { |
| ServerCounts tooMany = totals.get(tooManyIndex); |
| int goal = even; |
| if (tooManyIndex < numServersOverEven) { |
| goal++; |
| } |
| int needToUnload = tooMany.count - goal; |
| ServerCounts tooLittle = totals.get(end); |
| int needToLoad = goal - tooLittle.count - movedAlready; |
| if (needToUnload < 1 && needToLoad < 1) { |
| break; |
| } |
| if (needToUnload >= needToLoad) { |
| result.addAll(move(tooMany, tooLittle, needToLoad, donerTabletStats)); |
| end--; |
| movedAlready = 0; |
| } else { |
| result.addAll(move(tooMany, tooLittle, needToUnload, donerTabletStats)); |
| movedAlready += needToUnload; |
| } |
| if (needToUnload > needToLoad) { |
| moreBalancingNeeded = true; |
| } else { |
| tooManyIndex++; |
| donerTabletStats.clear(); |
| } |
| } |
| |
| } finally { |
| log.trace("balance ended with {} migrations", result.size()); |
| } |
| return moreBalancingNeeded; |
| } |
| |
| /** |
| * Select a tablet based on differences between table loads; if the loads are even, use the |
| * busiest table |
| */ |
| List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count, |
| Map<TableId,Map<TabletId,TabletStatistics>> donerTabletStats) { |
| |
| if (count == 0) { |
| return Collections.emptyList(); |
| } |
| |
| List<TabletMigration> result = new ArrayList<>(); |
| // Copy counts so we can update them as we propose migrations |
| Map<TableId,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status); |
| Map<TableId,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status); |
| |
| for (int i = 0; i < count; i++) { |
| TableId table; |
| Integer tooLittleCount; |
| if (tableToBalance == null) { |
| // find a table to migrate |
| // look for an uneven table count |
| int biggestDifference = 0; |
| TableId biggestDifferenceTable = null; |
| for (var tableEntry : tooMuchMap.entrySet()) { |
| TableId tableID = tableEntry.getKey(); |
| tooLittleMap.putIfAbsent(tableID, 0); |
| int diff = tableEntry.getValue() - tooLittleMap.get(tableID); |
| if (diff > biggestDifference) { |
| biggestDifference = diff; |
| biggestDifferenceTable = tableID; |
| } |
| } |
| if (biggestDifference < 2) { |
| table = busiest(tooMuch.status.getTableMap()); |
| } else { |
| table = biggestDifferenceTable; |
| } |
| } else { |
| // just balance the given table |
| table = tableToBalance; |
| } |
| Map<TabletId,TabletStatistics> onlineTabletsForTable = donerTabletStats.get(table); |
| try { |
| if (onlineTabletsForTable == null) { |
| onlineTabletsForTable = new HashMap<>(); |
| List<TabletStatistics> stats = getOnlineTabletsForTable(tooMuch.server, table); |
| if (stats == null) { |
| log.warn("Unable to find tablets to move"); |
| return result; |
| } |
| for (TabletStatistics stat : stats) |
| onlineTabletsForTable.put(stat.getTabletId(), stat); |
| donerTabletStats.put(table, onlineTabletsForTable); |
| } |
| } catch (Exception ex) { |
| log.error("Unable to select a tablet to move", ex); |
| return result; |
| } |
| TabletId tabletId = selectTablet(onlineTabletsForTable); |
| onlineTabletsForTable.remove(tabletId); |
| if (tabletId == null) |
| return result; |
| tooMuchMap.put(table, tooMuchMap.get(table) - 1); |
| /* |
| * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there |
| * is only one tabletserver that holds all of the tablets. Here we check to see if in fact |
| * that is the case and if so set the value to 0. |
| */ |
| tooLittleCount = tooLittleMap.get(table); |
| if (tooLittleCount == null) { |
| tooLittleCount = 0; |
| } |
| tooLittleMap.put(table, tooLittleCount + 1); |
| tooMuch.count--; |
| tooLittle.count++; |
| result.add(new TabletMigration(tabletId, tooMuch.server, tooLittle.server)); |
| } |
| return result; |
| } |
| |
| protected List<TabletStatistics> getOnlineTabletsForTable(TabletServerId tabletServerId, |
| TableId tableId) throws AccumuloSecurityException, AccumuloException { |
| return environment.listOnlineTabletsForTable(tabletServerId, tableId); |
| } |
| |
| static Map<TableId,Integer> tabletCountsPerTable(TServerStatus status) { |
| Map<TableId,Integer> result = new HashMap<>(); |
| if (status != null && status.getTableMap() != null) { |
| Map<String,TableStatistics> tableMap = status.getTableMap(); |
| for (Entry<String,TableStatistics> entry : tableMap.entrySet()) { |
| result.put(TableId.of(entry.getKey()), entry.getValue().getOnlineTabletCount()); |
| } |
| } |
| return result; |
| } |
| |
| static TabletId selectTablet(Map<TabletId,TabletStatistics> extents) { |
| if (extents.isEmpty()) |
| return null; |
| TabletId mostRecentlySplit = null; |
| long splitTime = 0; |
| for (Entry<TabletId,TabletStatistics> entry : extents.entrySet()) |
| if (entry.getValue().getSplitCreationTime() >= splitTime) { |
| splitTime = entry.getValue().getSplitCreationTime(); |
| mostRecentlySplit = entry.getKey(); |
| } |
| return mostRecentlySplit; |
| } |
| |
| // define what it means for a tablet to be busy |
| private static TableId busiest(Map<String,TableStatistics> tables) { |
| TableId result = null; |
| double busiest = Double.NEGATIVE_INFINITY; |
| for (Entry<String,TableStatistics> entry : tables.entrySet()) { |
| TableStatistics info = entry.getValue(); |
| double busy = info.getIngestRate() + info.getQueryRate(); |
| if (busy > busiest) { |
| busiest = busy; |
| result = TableId.of(entry.getKey()); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public void getAssignments(AssignmentParameters params) { |
| params.unassignedTablets().forEach((tabletId, tserverId) -> params.addAssignment(tabletId, |
| getAssignment(params.currentStatus(), tserverId))); |
| } |
| |
| private final ThrottledBalancerProblemReporter problemReporter = |
| new ThrottledBalancerProblemReporter(getClass()); |
| private final Problem noTserversProblem = problemReporter.createNoTabletServersProblem(); |
| private final OutstandingMigrationsProblem outstandingMigrationsProblem = |
| problemReporter.createOutstandingMigrationsProblem(); |
| |
| @Override |
| public long balance(BalanceParameters params) { |
| // do we have any servers? |
| if (params.currentStatus().isEmpty()) { |
| problemReporter.reportProblem(noTserversProblem); |
| } else { |
| // Don't migrate if we have migrations in progress |
| if (params.currentMigrations().isEmpty()) { |
| problemReporter.clearProblemReportTimes(); |
| if (getMigrations(params.currentStatus(), params.migrationsOut())) |
| return TimeUnit.SECONDS.toMillis(1); |
| } else { |
| outstandingMigrationsProblem.setMigrations(params.currentMigrations()); |
| problemReporter.reportProblem(outstandingMigrationsProblem); |
| } |
| } |
| return 5 * 1000; |
| } |
| |
| } |