Merge branch '2.1'
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index e4d8247..039282c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -224,7 +224,6 @@
   private final AtomicReference<Fate<Manager>> fateRef = new AtomicReference<>(null);
 
   volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = emptySortedMap();
-  volatile SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancer = emptySortedMap();
   final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
   private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
@@ -907,9 +906,7 @@
 
     private long updateStatus() {
       Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
-      TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
-      tserverStatus = gatherTableInformation(currentServers, temp);
-      tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
+      tserverStatus = gatherTableInformation(currentServers);
       checkForHeldServer(tserverStatus);
 
       if (!badServers.isEmpty()) {
@@ -959,39 +956,100 @@
       }
     }
 
-    private long balanceTablets() {
-
-      Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+    /**
+     * balanceTablets() balances tables by DataLevel. Return the current set of migrations
+     * partitioned by DataLevel
+     */
+    private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final Set<KeyExtent> migrations) {
+      final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
           new HashMap<>(DataLevel.values().length);
-      migrationsSnapshot().forEach(ke -> {
-        partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f -> new HashSet<>())
-            .add(ke);
+      // populate to prevent NPE
+      for (DataLevel dl : DataLevel.values()) {
+        partitionedMigrations.put(dl, new HashSet<>());
+      }
+      migrations.forEach(ke -> {
+        partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
       });
+      return partitionedMigrations;
+    }
+
+    /**
+     * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map
+     * that only contains entries for tables in the DataLevel
+     */
+    private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView(
+        final DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) {
+      final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>();
+      status.forEach((tsi, tss) -> {
+        final TabletServerStatus copy = tss.deepCopy();
+        final Map<String,TableInfo> oldTableMap = copy.getTableMap();
+        final Map<String,TableInfo> newTableMap =
+            new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
+        if (dl == DataLevel.ROOT) {
+          if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+            newTableMap.put(AccumuloTable.ROOT.tableName(),
+                oldTableMap.get(AccumuloTable.ROOT.tableName()));
+          }
+        } else if (dl == DataLevel.METADATA) {
+          if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) {
+            newTableMap.put(AccumuloTable.METADATA.tableName(),
+                oldTableMap.get(AccumuloTable.METADATA.tableName()));
+          }
+        } else if (dl == DataLevel.USER) {
+          if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName())
+              && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+            newTableMap.putAll(oldTableMap);
+          } else {
+            oldTableMap.forEach((table, info) -> {
+              if (!table.equals(AccumuloTable.ROOT.tableName())
+                  && !table.equals(AccumuloTable.METADATA.tableName())) {
+                newTableMap.put(table, info);
+              }
+            });
+          }
+        } else {
+          throw new IllegalArgumentException("Unhandled DataLevel value: " + dl);
+        }
+        copy.setTableMap(newTableMap);
+        tserverStatusForLevel.put(tsi, copy);
+      });
+      return tserverStatusForLevel;
+    }
+
+    private long balanceTablets() {
 
       final int tabletsNotHosted = notHosted();
       BalanceParamsImpl params = null;
       long wait = 0;
       long totalMigrationsOut = 0;
+      final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+          partitionMigrations(migrationsSnapshot());
+
       for (DataLevel dl : DataLevel.values()) {
-        final Set<KeyExtent> migrationsForLevel = partitionedMigrations.get(dl);
-        if (migrationsForLevel == null) {
-          continue;
-        }
         if (dl == DataLevel.USER && tabletsNotHosted > 0) {
           log.debug("not balancing user tablets because there are {} unhosted tablets",
               tabletsNotHosted);
           continue;
         }
+        // Create a view of the tserver status such that it only contains the tables
+        // for this level in the tableMap.
+        final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
+            createTServerStatusView(dl, tserverStatus);
+        // Construct the Thrift variant of the map above for the BalancerParams
+        final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel =
+            new TreeMap<>();
+        tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel
+            .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status)));
+
         long migrationsOutForLevel = 0;
-        int i = 0;
+        int attemptNum = 0;
         do {
-          i++;
-          log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, i);
-          params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, tserverStatus,
-              migrationsForLevel);
+          log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum);
+          params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel,
+              tserverStatusForLevel, partitionedMigrations.get(dl));
           wait = Math.max(tabletBalancer.balance(params), wait);
           migrationsOutForLevel = params.migrationsOut().size();
-          for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(),
+          for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancerLevel.keySet(),
               params.migrationsOut())) {
             final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
             if (migrations.containsKey(ke)) {
@@ -1039,8 +1097,8 @@
 
   }
 
-  private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
-      Set<TServerInstance> currentServers, SortedMap<TabletServerId,TServerStatus> balancerMap) {
+  private SortedMap<TServerInstance,TabletServerStatus>
+      gatherTableInformation(Set<TServerInstance> currentServers) {
     final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
     int threads = getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
     long start = System.currentTimeMillis();
@@ -1129,8 +1187,6 @@
 
     // Threads may still modify map after shutdownNow is called, so create an immutable snapshot.
     SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result);
-    tserverStatus.forEach((tsi, status) -> balancerMap.put(new TabletServerIdImpl(tsi),
-        TServerStatusImpl.fromThrift(status)));
 
     synchronized (badServers) {
       badServers.keySet().retainAll(currentServers);