Use Ample for tablet location and tserver suspension (#1653)
Use Ample for tablet location updates and for placing / clearing tserver suspension information
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index a7039d4..bd3eb09 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -222,6 +222,10 @@
public TabletMutator putChopped();
+ public TabletMutator putSuspension(TServer tserver, long suspensionTime);
+
+ public TabletMutator deleteSuspension();
+
/**
* This method persist (or queues for persisting) previous put and deletes against this object.
* Unless this method is called, previous calls will never be persisted. The purpose of this
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 6061f56..99f8a61 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -21,31 +21,27 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.hadoop.fs.Path;
class MetaDataStateStore implements TabletStateStore {
- private static final int THREADS = 4;
- private static final int LATENCY = 1000;
- private static final int MAX_MEMORY = 200 * 1024 * 1024;
-
protected final ClientContext context;
protected final CurrentState state;
private final String targetTableName;
+ private final Ample ample;
protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) {
this.context = context;
this.state = state;
+ this.ample = context.getAmple();
this.targetTableName = targetTableName;
}
@@ -58,57 +54,28 @@
return new MetaDataTableScanner(context, TabletsSection.getRange(), state, targetTableName);
}
- @Override
public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ try (var tabletsMutator = ample.mutateTablets()) {
for (Assignment assignment : assignments) {
- Mutation m = new Mutation(assignment.tablet.toMetaRow());
- assignment.server.putLocation(m);
- assignment.server.clearFutureLocation(m);
- SuspendingTServer.clearSuspension(m);
- writer.addMutation(m);
+ tabletsMutator.mutateTablet(assignment.tablet)
+ .putLocation(assignment.server, LocationType.CURRENT)
+ .deleteLocation(assignment.server, LocationType.FUTURE).deleteSuspension().mutate();
}
- } catch (Exception ex) {
+ } catch (RuntimeException ex) {
throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
- }
- }
-
- BatchWriter createBatchWriter() {
- try {
- return context.createBatchWriter(targetTableName,
- new BatchWriterConfig().setMaxMemory(MAX_MEMORY)
- .setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
- } catch (Exception e) {
- throw new RuntimeException(e);
}
}
@Override
public void setFutureLocations(Collection<Assignment> assignments)
throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ try (var tabletsMutator = ample.mutateTablets()) {
for (Assignment assignment : assignments) {
- Mutation m = new Mutation(assignment.tablet.toMetaRow());
- SuspendingTServer.clearSuspension(m);
- assignment.server.putFutureLocation(m);
- writer.addMutation(m);
+ tabletsMutator.mutateTablet(assignment.tablet).deleteSuspension()
+ .putLocation(assignment.server, LocationType.FUTURE).mutate();
}
- } catch (Exception ex) {
+ } catch (RuntimeException ex) {
throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
}
}
@@ -128,67 +95,49 @@
private void unassign(Collection<TabletLocationState> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp)
throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ try (var tabletsMutator = ample.mutateTablets()) {
for (TabletLocationState tls : tablets) {
- Mutation m = new Mutation(tls.extent.toMetaRow());
+ TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent);
if (tls.current != null) {
- tls.current.clearLocation(m);
+ tabletMutator.deleteLocation(tls.current, LocationType.CURRENT);
if (logsForDeadServers != null) {
List<Path> logs = logsForDeadServers.get(tls.current);
if (logs != null) {
for (Path log : logs) {
LogEntry entry =
new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+ tabletMutator.putWal(entry);
}
}
}
if (suspensionTimestamp >= 0) {
- SuspendingTServer suspender =
- new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp);
- suspender.setSuspension(m);
+ tabletMutator.putSuspension(tls.current, suspensionTimestamp);
}
}
if (tls.suspend != null && suspensionTimestamp < 0) {
- SuspendingTServer.clearSuspension(m);
+ tabletMutator.deleteSuspension();
}
if (tls.future != null) {
- tls.future.clearFutureLocation(m);
+ tabletMutator.deleteLocation(tls.future, LocationType.FUTURE);
}
- writer.addMutation(m);
+ tabletMutator.mutate();
}
- } catch (Exception ex) {
+ } catch (RuntimeException ex) {
throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
}
}
@Override
public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
+ try (var tabletsMutator = ample.mutateTablets()) {
for (TabletLocationState tls : tablets) {
if (tls.suspend != null) {
continue;
}
- Mutation m = new Mutation(tls.extent.toMetaRow());
- SuspendingTServer.clearSuspension(m);
- writer.addMutation(m);
+ tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate();
}
- } catch (Exception ex) {
+ } catch (RuntimeException ex) {
throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
}
}
@@ -196,5 +145,4 @@
public String name() {
return "Normal Tablets";
}
-
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
index c6bb343..75bbe7a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java
@@ -18,11 +18,8 @@
*/
package org.apache.accumulo.server.master.state;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN;
-
import java.util.Objects;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.HostAndPort;
@@ -44,8 +41,8 @@
return new SuspendingTServer(HostAndPort.fromString(parts[0]), Long.parseLong(parts[1]));
}
- public Value toValue() {
- return new Value(server + "|" + suspensionTime);
+ public static Value toValue(HostAndPort tServer, long suspensionTime) {
+ return new Value(tServer + "|" + suspensionTime);
}
@Override
@@ -57,14 +54,6 @@
return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime;
}
- public void setSuspension(Mutation m) {
- m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue());
- }
-
- public static void clearSuspension(Mutation m) {
- m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier());
- }
-
@Override
public int hashCode() {
return Objects.hash(server, suspensionTime);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index a3cd4ab..d5e40c5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -25,12 +25,8 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.HostAndPort;
@@ -83,30 +79,6 @@
this(location.getHostAndPort(), location.getSession());
}
- public void putLocation(Mutation m) {
- m.put(CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void putFutureLocation(Mutation m) {
- m.put(FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void putLastLocation(Mutation m) {
- m.put(LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
- }
-
- public void clearLastLocation(Mutation m) {
- m.putDelete(LastLocationColumnFamily.NAME, asColumnQualifier());
- }
-
- public void clearFutureLocation(Mutation m) {
- m.putDelete(FutureLocationColumnFamily.NAME, asColumnQualifier());
- }
-
- public void clearLocation(Mutation m) {
- m.putDelete(CurrentLocationColumnFamily.NAME, asColumnQualifier());
- }
-
@Override
public int compareTo(TServerInstance other) {
if (this == other)
@@ -140,14 +112,6 @@
return getLocation().toString();
}
- private Text asColumnQualifier() {
- return new Text(this.getSession());
- }
-
- private Value asMutationValue() {
- return new Value(getLocation().toString());
- }
-
@Override
public HostAndPort getLocation() {
return location;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index 3bb3ebb..e518cec 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -34,6 +34,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
@@ -41,6 +42,7 @@
import org.apache.accumulo.fate.FateTxId;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.master.state.SuspendingTServer;
import org.apache.hadoop.io.Text;
import com.google.common.base.Preconditions;
@@ -202,6 +204,23 @@
return this;
}
+ @Override
+ public Ample.TabletMutator putSuspension(Ample.TServer tServer, long suspensionTime) {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+ mutation.put(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+ SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(),
+ SuspendingTServer.toValue(tServer.getLocation(), suspensionTime));
+ return this;
+ }
+
+ @Override
+ public Ample.TabletMutator deleteSuspension() {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
+ mutation.putDelete(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
+ SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier());
+ return this;
+ }
+
protected Mutation getMutation() {
updatesEnabled = false;
return mutation;
diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index 473295a..26b7a97 100644
--- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -28,20 +28,20 @@
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.master.state.ClosableIterator;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -74,6 +74,7 @@
// make some tablets, spread 'em around
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
ClientContext context = (ClientContext) c;
+ ServerContext serverContext = cluster.getServerContext();
String table = this.getUniqueNames(1)[0];
c.securityOperations().grantTablePermission("root", MetadataTable.NAME,
TablePermission.WRITE);
@@ -134,19 +135,16 @@
}
assertNotEquals(null, moved);
// throw a mutation in as if we were the dying tablet
- BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation assignment = new Mutation(moved.extent.toMetaRow());
- moved.current.putLocation(assignment);
- bw.addMutation(assignment);
- bw.close();
+ TabletMutator tabletMutator = serverContext.getAmple().mutateTablet(moved.extent);
+ tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+ tabletMutator.mutate();
// wait for the master to fix the problem
waitForCleanStore(store);
// now jam up the metadata table
- bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- assignment = new Mutation(new KeyExtent(MetadataTable.ID, null, null).toMetaRow());
- moved.current.putLocation(assignment);
- bw.addMutation(assignment);
- bw.close();
+ tabletMutator =
+ serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null));
+ tabletMutator.putLocation(moved.current, LocationType.CURRENT);
+ tabletMutator.mutate();
waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, context));
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index 1b81d68..9bb109f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -37,10 +37,8 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
-import org.apache.accumulo.core.clientImpl.Writer;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -48,6 +46,7 @@
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
@@ -58,6 +57,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.fate.zookeeper.ZooLock;
@@ -206,11 +206,11 @@
MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, context, zl);
TServerInstance instance = new TServerInstance(location, zl.getSessionId());
- Writer writer = MetadataTableUtil.getMetadataTable(context);
Assignment assignment = new Assignment(high, instance);
- Mutation m = new Mutation(assignment.tablet.toMetaRow());
- assignment.server.putFutureLocation(m);
- writer.update(m);
+
+ TabletMutator tabletMutator = context.getAmple().mutateTablet(extent);
+ tabletMutator.putLocation(assignment.server, LocationType.FUTURE);
+ tabletMutator.mutate();
if (steps >= 1) {
Map<Long,List<TabletFile>> bulkFiles = getBulkFilesLoaded(context, extent);