Merge branch '2.1'
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index e4d8247..039282c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -224,7 +224,6 @@
private final AtomicReference<Fate<Manager>> fateRef = new AtomicReference<>(null);
volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = emptySortedMap();
- volatile SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancer = emptySortedMap();
final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
@@ -907,9 +906,7 @@
private long updateStatus() {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
- TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
- tserverStatus = gatherTableInformation(currentServers, temp);
- tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
+ tserverStatus = gatherTableInformation(currentServers);
checkForHeldServer(tserverStatus);
if (!badServers.isEmpty()) {
@@ -959,39 +956,100 @@
}
}
- private long balanceTablets() {
-
- Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+ /**
+ * balanceTablets() balances tables by DataLevel. Return the current set of migrations
+ * partitioned by DataLevel
+ */
+ private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final Set<KeyExtent> migrations) {
+ final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
new HashMap<>(DataLevel.values().length);
- migrationsSnapshot().forEach(ke -> {
- partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f -> new HashSet<>())
- .add(ke);
+ // populate to prevent NPE
+ for (DataLevel dl : DataLevel.values()) {
+ partitionedMigrations.put(dl, new HashSet<>());
+ }
+ migrations.forEach(ke -> {
+ partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
});
+ return partitionedMigrations;
+ }
+
+ /**
+ * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map
+ * that only contains entries for tables in the DataLevel
+ */
+ private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView(
+ final DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) {
+ final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>();
+ status.forEach((tsi, tss) -> {
+ final TabletServerStatus copy = tss.deepCopy();
+ final Map<String,TableInfo> oldTableMap = copy.getTableMap();
+ final Map<String,TableInfo> newTableMap =
+ new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
+ if (dl == DataLevel.ROOT) {
+ if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+ newTableMap.put(AccumuloTable.ROOT.tableName(),
+ oldTableMap.get(AccumuloTable.ROOT.tableName()));
+ }
+ } else if (dl == DataLevel.METADATA) {
+ if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) {
+ newTableMap.put(AccumuloTable.METADATA.tableName(),
+ oldTableMap.get(AccumuloTable.METADATA.tableName()));
+ }
+ } else if (dl == DataLevel.USER) {
+ if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName())
+ && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+ newTableMap.putAll(oldTableMap);
+ } else {
+ oldTableMap.forEach((table, info) -> {
+ if (!table.equals(AccumuloTable.ROOT.tableName())
+ && !table.equals(AccumuloTable.METADATA.tableName())) {
+ newTableMap.put(table, info);
+ }
+ });
+ }
+ } else {
+ throw new IllegalArgumentException("Unhandled DataLevel value: " + dl);
+ }
+ copy.setTableMap(newTableMap);
+ tserverStatusForLevel.put(tsi, copy);
+ });
+ return tserverStatusForLevel;
+ }
+
+ private long balanceTablets() {
final int tabletsNotHosted = notHosted();
BalanceParamsImpl params = null;
long wait = 0;
long totalMigrationsOut = 0;
+ final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+ partitionMigrations(migrationsSnapshot());
+
for (DataLevel dl : DataLevel.values()) {
- final Set<KeyExtent> migrationsForLevel = partitionedMigrations.get(dl);
- if (migrationsForLevel == null) {
- continue;
- }
if (dl == DataLevel.USER && tabletsNotHosted > 0) {
log.debug("not balancing user tablets because there are {} unhosted tablets",
tabletsNotHosted);
continue;
}
+ // Create a view of the tserver status such that it only contains the tables
+ // for this level in the tableMap.
+ final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
+ createTServerStatusView(dl, tserverStatus);
+ // Construct the Thrift variant of the map above for the BalancerParams
+ final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel =
+ new TreeMap<>();
+ tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel
+ .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status)));
+
long migrationsOutForLevel = 0;
- int i = 0;
+ int attemptNum = 0;
do {
- i++;
- log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, i);
- params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, tserverStatus,
- migrationsForLevel);
+ log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum);
+ params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel,
+ tserverStatusForLevel, partitionedMigrations.get(dl));
wait = Math.max(tabletBalancer.balance(params), wait);
migrationsOutForLevel = params.migrationsOut().size();
- for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(),
+ for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancerLevel.keySet(),
params.migrationsOut())) {
final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
if (migrations.containsKey(ke)) {
@@ -1039,8 +1097,8 @@
}
- private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
- Set<TServerInstance> currentServers, SortedMap<TabletServerId,TServerStatus> balancerMap) {
+ private SortedMap<TServerInstance,TabletServerStatus>
+ gatherTableInformation(Set<TServerInstance> currentServers) {
final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
int threads = getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
long start = System.currentTimeMillis();
@@ -1129,8 +1187,6 @@
// Threads may still modify map after shutdownNow is called, so create an immutable snapshot.
SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result);
- tserverStatus.forEach((tsi, status) -> balancerMap.put(new TabletServerIdImpl(tsi),
- TServerStatusImpl.fromThrift(status)));
synchronized (badServers) {
badServers.keySet().retainAll(currentServers);