Improvements to TabletStateChangeIteratorIT. Closes #1932

* Modified copyTable method to gather info about the metadata first
before creating the copy, preventing information about the copy itself
from being seen.
* Added debugging information
* Changed createTable method to pre split tables
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
index 0a66f3d..f7597fa 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -20,10 +20,12 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
@@ -43,6 +45,7 @@
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.Tables;
@@ -68,6 +71,8 @@
 import org.apache.accumulo.server.manager.state.TabletStateChangeIterator;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
@@ -76,6 +81,7 @@
  * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580)
  */
 public class TabletStateChangeIteratorIT extends AccumuloClusterHarness {
+  private final static Logger log = LoggerFactory.getLogger(TabletStateChangeIteratorIT.class);
 
   @Override
   public int defaultTimeoutSeconds() {
@@ -105,9 +111,12 @@
       copyTable(client, MetadataTable.NAME, metaCopy1);
 
       State state = new State(client);
-      while (findTabletsNeedingAttention(client, metaCopy1, state) > 0) {
+      int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state);
+      while (tabletsInFlux > 0) {
+        log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1);
         UtilWaitThread.sleep(500);
         copyTable(client, MetadataTable.NAME, metaCopy1);
+        tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state);
       }
       assertEquals("No tables should need attention", 0,
           findTabletsNeedingAttention(client, metaCopy1, state));
@@ -196,31 +205,39 @@
   private int findTabletsNeedingAttention(AccumuloClient client, String table, State state)
       throws TableNotFoundException {
     int results = 0;
+    List<Key> resultList = new ArrayList<>();
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
       MetaDataTableScanner.configureScanner(scanner, state);
+      log.debug("Current state = {}", state);
       scanner.updateScanIteratorOption("tabletChange", "debug", "1");
       for (Entry<Key,Value> e : scanner) {
-        if (e != null)
+        if (e != null) {
           results++;
+          resultList.add(e.getKey());
+        }
       }
     }
-
+    log.debug("Tablets in flux: {}", resultList);
     return results;
   }
 
   private void createTable(AccumuloClient client, String t, boolean online)
       throws AccumuloSecurityException, AccumuloException, TableNotFoundException,
       TableExistsException {
-    client.tableOperations().create(t);
-    client.tableOperations().online(t, true);
     SortedSet<Text> partitionKeys = new TreeSet<>();
     partitionKeys.add(new Text("some split"));
-    client.tableOperations().addSplits(t, partitionKeys);
+    NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys);
+    client.tableOperations().create(t, ntc);
+    client.tableOperations().online(t, true);
     if (!online) {
       client.tableOperations().offline(t, true);
     }
   }
 
+  /**
+   * Create a copy of the source table by first gathering all the rows of the source in a list of
+   * mutations. Then create the copy of the table and apply the mutations to the copy.
+   */
   private void copyTable(AccumuloClient client, String source, String copy)
       throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
       TableExistsException {
@@ -230,10 +247,10 @@
       // ignored
     }
 
-    client.tableOperations().create(copy);
+    log.info("Gathering rows to copy {} ", source);
+    List<Mutation> mutations = new ArrayList<>();
 
-    try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY);
-        BatchWriter writer = client.createBatchWriter(copy, new BatchWriterConfig())) {
+    try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) {
       RowIterator rows = new RowIterator(new IsolatedScanner(scanner));
 
       while (rows.hasNext()) {
@@ -250,9 +267,22 @@
               k.getTimestamp(), entry.getValue());
         }
 
+        mutations.add(m);
+      }
+    }
+
+    // metadata should be stable with only 7 rows (1 replication + 2 for each table)
+    log.debug("Gathered {} rows to create copy {}", mutations.size(), copy);
+    assertEquals("Metadata should have 7 rows (1 repl + 2 for each table)", 7, mutations.size());
+    client.tableOperations().create(copy);
+
+    try (BatchWriter writer = client.createBatchWriter(copy, new BatchWriterConfig())) {
+      for (Mutation m : mutations) {
         writer.addMutation(m);
       }
     }
+
+    log.info("Finished creating copy " + copy);
   }
 
   private void dropTables(AccumuloClient client, String... tables)
@@ -262,7 +292,7 @@
     }
   }
 
-  private class State implements CurrentState {
+  private static class State implements CurrentState {
 
     final AccumuloClient client;
 
@@ -270,6 +300,9 @@
       this.client = client;
     }
 
+    private Set<TServerInstance> tservers;
+    private Set<TableId> onlineTables;
+
     @Override
     public Set<TServerInstance> onlineTabletServers() {
       HashSet<TServerInstance> tservers = new HashSet<>();
@@ -285,6 +318,7 @@
           throw new RuntimeException(e);
         }
       }
+      this.tservers = Collections.unmodifiableSet(tservers);
       return tservers;
     }
 
@@ -292,8 +326,9 @@
     public Set<TableId> onlineTables() {
       ClientContext context = (ClientContext) client;
       Set<TableId> onlineTables = Tables.getIdToNameMap(context).keySet();
-      return Sets.filter(onlineTables,
+      this.onlineTables = Sets.filter(onlineTables,
           tableId -> Tables.getTableState(context, tableId) == TableState.ONLINE);
+      return this.onlineTables;
     }
 
     @Override
@@ -315,6 +350,11 @@
     public ManagerState getManagerState() {
       return ManagerState.NORMAL;
     }
+
+    @Override
+    public String toString() {
+      return "tservers: " + tservers + " onlineTables: " + onlineTables;
+    }
   }
 
 }