Use Ample for tablet location and tserver suspension (#1653)

Use Ample for tablet location updates and for placing / clearing tserver suspension information
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index a7039d4..bd3eb09 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -222,6 +222,10 @@
 
     public TabletMutator putChopped();
 
+    public TabletMutator putSuspension(TServer tserver, long suspensionTime);
+
+    public TabletMutator deleteSuspension();
+
     /**
      * This method persist (or queues for persisting) previous put and deletes against this object.
      * Unless this method is called, previous calls will never be persisted. The purpose of this
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 6061f56..99f8a61 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -21,31 +21,27 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.hadoop.fs.Path;
 
 class MetaDataStateStore implements TabletStateStore {
 
-  private static final int THREADS = 4;
-  private static final int LATENCY = 1000;
-  private static final int MAX_MEMORY = 200 * 1024 * 1024;
-
   protected final ClientContext context;
   protected final CurrentState state;
   private final String targetTableName;
+  private final Ample ample;
 
   protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) {
     this.context = context;
     this.state = state;
+    this.ample = context.getAmple();
     this.targetTableName = targetTableName;
   }
 
@@ -58,57 +54,28 @@
     return new MetaDataTableScanner(context, TabletsSection.getRange(), state, targetTableName);
   }
 
-  @Override
   public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
+    try (var tabletsMutator = ample.mutateTablets()) {
       for (Assignment assignment : assignments) {
-        Mutation m = new Mutation(assignment.tablet.toMetaRow());
-        assignment.server.putLocation(m);
-        assignment.server.clearFutureLocation(m);
-        SuspendingTServer.clearSuspension(m);
-        writer.addMutation(m);
+        tabletsMutator.mutateTablet(assignment.tablet)
+            .putLocation(assignment.server, LocationType.CURRENT)
+            .deleteLocation(assignment.server, LocationType.FUTURE).deleteSuspension().mutate();
       }
-    } catch (Exception ex) {
+    } catch (RuntimeException ex) {
       throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
-    }
-  }
-
-  BatchWriter createBatchWriter() {
-    try {
-      return context.createBatchWriter(targetTableName,
-          new BatchWriterConfig().setMaxMemory(MAX_MEMORY)
-              .setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
     }
   }
 
   @Override
   public void setFutureLocations(Collection<Assignment> assignments)
       throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
+    try (var tabletsMutator = ample.mutateTablets()) {
       for (Assignment assignment : assignments) {
-        Mutation m = new Mutation(assignment.tablet.toMetaRow());
-        SuspendingTServer.clearSuspension(m);
-        assignment.server.putFutureLocation(m);
-        writer.addMutation(m);
+        tabletsMutator.mutateTablet(assignment.tablet).deleteSuspension()
+            .putLocation(assignment.server, LocationType.FUTURE).mutate();
       }
-    } catch (Exception ex) {
+    } catch (RuntimeException ex) {
       throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
     }
   }
 
@@ -128,67 +95,49 @@
   private void unassign(Collection<TabletLocationState> tablets,
       Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
       throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
+    try (var tabletsMutator = ample.mutateTablets()) {
       for (TabletLocationState tls : tablets) {
-        Mutation m = new Mutation(tls.extent.toMetaRow());
+        TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent);
         if (tls.current != null) {
-          tls.current.clearLocation(m);
+          tabletMutator.deleteLocation(tls.current, LocationType.CURRENT);
           if (logsForDeadServers != null) {
             List<Path> logs = logsForDeadServers.get(tls.current);
             if (logs != null) {
               for (Path log : logs) {
                 LogEntry entry =
                     new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
-                m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+                tabletMutator.putWal(entry);
               }
             }
           }
           if (suspensionTimestamp >= 0) {
-            SuspendingTServer suspender =
-                new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp);
-            suspender.setSuspension(m);
+            tabletMutator.putSuspension(tls.current, suspensionTimestamp);
           }
         }
         if (tls.suspend != null && suspensionTimestamp < 0) {
-          SuspendingTServer.clearSuspension(m);
+          tabletMutator.deleteSuspension();
         }
         if (tls.future != null) {
-          tls.future.clearFutureLocation(m);
+          tabletMutator.deleteLocation(tls.future, LocationType.FUTURE);
         }
-        writer.addMutation(m);
+        tabletMutator.mutate();
       }
-    } catch (Exception ex) {
+    } catch (RuntimeException ex) {
       throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
     }
   }
 
   @Override
   public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
+    try (var tabletsMutator = ample.mutateTablets()) {
       for (TabletLocationState tls : tablets) {
         if (tls.suspend != null) {
           continue;
         }
-        Mutation m = new Mutation(tls.extent.toMetaRow());
-        SuspendingTServer.clearSuspension(m);
-        writer.addMutation(m);
+        tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate();
       }
-    } catch (Exception ex) {
+    } catch (RuntimeException ex) {
       throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
     }
   }
 
@@ -196,5 +145,4 @@
   public String name() {
     return "Normal Tablets";
   }
-
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
index c6bb343..75bbe7a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
@@ -18,11 +18,8 @@
  */
 package org.apache.accumulo.server.master.state;
 
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN;
-
 import java.util.Objects;
 
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.HostAndPort;
 
@@ -44,8 +41,8 @@
     return new SuspendingTServer(HostAndPort.fromString(parts[0]), Long.parseLong(parts[1]));
   }
 
-  public Value toValue() {
-    return new Value(server + "|" + suspensionTime);
+  public static Value toValue(HostAndPort tServer, long suspensionTime) {
+    return new Value(tServer + "|" + suspensionTime);
   }
 
   @Override
@@ -57,14 +54,6 @@
     return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime;
   }
 
-  public void setSuspension(Mutation m) {
-    m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue());
-  }
-
-  public static void clearSuspension(Mutation m) {
-    m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier());
-  }
-
   @Override
   public int hashCode() {
     return Objects.hash(server, suspensionTime);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index a3cd4ab..d5e40c5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -25,12 +25,8 @@
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.HostAndPort;
@@ -83,30 +79,6 @@
     this(location.getHostAndPort(), location.getSession());
   }
 
-  public void putLocation(Mutation m) {
-    m.put(CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
-  }
-
-  public void putFutureLocation(Mutation m) {
-    m.put(FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
-  }
-
-  public void putLastLocation(Mutation m) {
-    m.put(LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
-  }
-
-  public void clearLastLocation(Mutation m) {
-    m.putDelete(LastLocationColumnFamily.NAME, asColumnQualifier());
-  }
-
-  public void clearFutureLocation(Mutation m) {
-    m.putDelete(FutureLocationColumnFamily.NAME, asColumnQualifier());
-  }
-
-  public void clearLocation(Mutation m) {
-    m.putDelete(CurrentLocationColumnFamily.NAME, asColumnQualifier());
-  }
-
   @Override
   public int compareTo(TServerInstance other) {
     if (this == other)
@@ -140,14 +112,6 @@
     return getLocation().toString();
   }
 
-  private Text asColumnQualifier() {
-    return new Text(this.getSession());
-  }
-
-  private Value asMutationValue() {
-    return new Value(getLocation().toString());
-  }
-
   @Override
   public HostAndPort getLocation() {
     return location;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index 3bb3ebb..e518cec 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -34,6 +34,7 @@
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
@@ -41,6 +42,7 @@
 import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.master.state.SuspendingTServer;
 import org.apache.hadoop.io.Text;
 
 import com.google.common.base.Preconditions;
@@ -202,6 +204,23 @@
     return this;
   }
 
+  @Override
+  public Ample.TabletMutator putSuspension(Ample.TServer tServer, long suspensionTime) {
+    Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+    mutation.put(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+        SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(),
+        SuspendingTServer.toValue(tServer.getLocation(), suspensionTime));
+    return this;
+  }
+
+  @Override
+  public Ample.TabletMutator deleteSuspension() {
+    Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+    mutation.putDelete(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+        SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier());
+    return this;
+  }
+
   protected Mutation getMutation() {
     updatesEnabled = false;
     return mutation;
diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index 473295a..26b7a97 100644
--- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -28,20 +28,20 @@
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.master.state.ClosableIterator;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -74,6 +74,7 @@
     // make some tablets, spread 'em around
     try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
       ClientContext context = (ClientContext) c;
+      ServerContext serverContext = cluster.getServerContext();
       String table = this.getUniqueNames(1)[0];
       c.securityOperations().grantTablePermission("root", MetadataTable.NAME,
           TablePermission.WRITE);
@@ -134,19 +135,16 @@
       }
       assertNotEquals(null, moved);
       // throw a mutation in as if we were the dying tablet
-      BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-      Mutation assignment = new Mutation(moved.extent.toMetaRow());
-      moved.current.putLocation(assignment);
-      bw.addMutation(assignment);
-      bw.close();
+      TabletMutator tabletMutator = serverContext.getAmple().mutateTablet(moved.extent);
+      tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+      tabletMutator.mutate();
       // wait for the master to fix the problem
       waitForCleanStore(store);
       // now jam up the metadata table
-      bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-      assignment = new Mutation(new KeyExtent(MetadataTable.ID, null, null).toMetaRow());
-      moved.current.putLocation(assignment);
-      bw.addMutation(assignment);
-      bw.close();
+      tabletMutator =
+          serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null));
+      tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+      tabletMutator.mutate();
       waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, context));
     }
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index 1b81d68..9bb109f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -37,10 +37,8 @@
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.clientImpl.ScannerImpl;
-import org.apache.accumulo.core.clientImpl.Writer;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -48,6 +46,7 @@
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
@@ -58,6 +57,7 @@
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
@@ -206,11 +206,11 @@
 
     MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, context, zl);
     TServerInstance instance = new TServerInstance(location, zl.getSessionId());
-    Writer writer = MetadataTableUtil.getMetadataTable(context);
     Assignment assignment = new Assignment(high, instance);
-    Mutation m = new Mutation(assignment.tablet.toMetaRow());
-    assignment.server.putFutureLocation(m);
-    writer.update(m);
+
+    TabletMutator tabletMutator = context.getAmple().mutateTablet(extent);
+    tabletMutator.putLocation(assignment.server, LocationType.FUTURE);
+    tabletMutator.mutate();
 
     if (steps >= 1) {
       Map<Long,List<TabletFile>> bulkFiles = getBulkFilesLoaded(context, extent);