Add a merge column marker to make merge idempotent (#3957)
The new MERGE marker allows correctly resuming metadata updates for
no-chop merge if there was a failure and resume and makes merge
idempotent
Co-authored-by: Keith Turner <kturner@apache.org>
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 0cb57e5..2bc0a6e 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -325,6 +325,18 @@
public static final String STR_NAME = "ecomp";
public static final Text NAME = new Text(STR_NAME);
}
+
+ /**
+ * Column family for indicating that the files in a tablet contain fenced files that have been
+ * merged from other tablets during a merge operation. This is used to support resuming a failed
+ * merge operation.
+ */
+ public static class MergedColumnFamily {
+ public static final String STR_NAME = "merged";
+ public static final Text NAME = new Text(STR_NAME);
+ public static final ColumnFQ MERGED_COLUMN = new ColumnFQ(NAME, new Text(STR_NAME));
+ public static final Value MERGED_VALUE = new Value("merged");
+ }
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index fccc3e6..af31605 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -62,6 +62,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
@@ -104,6 +105,7 @@
private OptionalLong compact = OptionalLong.empty();
private Double splitRatio = null;
private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions;
+ private boolean merged;
public enum LocationType {
CURRENT, FUTURE, LAST
@@ -125,7 +127,8 @@
COMPACT_ID,
SPLIT_RATIO,
SUSPEND,
- ECOMP
+ ECOMP,
+ MERGED
}
public static class Location {
@@ -345,6 +348,11 @@
return splitRatio;
}
+ public boolean hasMerged() {
+ ensureFetched(ColumnType.MERGED);
+ return merged;
+ }
+
public SortedMap<Key,Value> getKeyValues() {
Preconditions.checkState(keyValues != null, "Requested key values when it was not saved");
return keyValues;
@@ -479,6 +487,9 @@
extCompBuilder.put(ExternalCompactionId.of(qual),
ExternalCompactionMetadata.fromJson(val));
break;
+ case MergedColumnFamily.STR_NAME:
+ te.merged = true;
+ break;
default:
throw new IllegalStateException("Unexpected family " + fam);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index d881123..515e6a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -76,6 +76,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -336,6 +337,9 @@
case ECOMP:
families.add(ExternalCompactionColumnFamily.NAME);
break;
+ case MERGED:
+ families.add(MergedColumnFamily.NAME);
+ break;
default:
throw new IllegalArgumentException("Unknown col type " + colToFetch);
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
index a3a3d33..d547b85 100644
--- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
@@ -20,6 +20,8 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.accumulo.core.metadata.StoredTabletFile.serialize;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_VALUE;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
@@ -113,6 +115,8 @@
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put("");
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put("");
+ MERGED_COLUMN.put(mutation, new Value());
+
SortedMap<Key,Value> rowMap = toRowMap(mutation);
TabletMetadata tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(),
@@ -143,6 +147,7 @@
assertTrue(tm.sawPrevEndRow());
assertEquals("M123456789", tm.getTime().encode());
assertEquals(Set.of(sf1, sf2), Set.copyOf(tm.getScans()));
+ assertTrue(tm.hasMerged());
}
@Test
@@ -258,6 +263,30 @@
assertFalse(tm.hasCurrent());
}
+ @Test
+ public void testMergedColumn() {
+ KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da"));
+
+ // Test merged column set
+ Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
+ MERGED_COLUMN.put(mutation, MERGED_VALUE);
+ TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+ EnumSet.of(ColumnType.MERGED), true);
+ assertTrue(tm.hasMerged());
+
+ // Column not set
+ mutation = TabletColumnFamily.createPrevRowMutation(extent);
+ tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+ EnumSet.of(ColumnType.MERGED), true);
+ assertFalse(tm.hasMerged());
+
+ // MERGED Column not fetched
+ mutation = TabletColumnFamily.createPrevRowMutation(extent);
+ tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+ EnumSet.of(ColumnType.PREV_ROW), true);
+ assertThrows(IllegalStateException.class, tm::hasMerged);
+ }
+
private SortedMap<Key,Value> toRowMap(Mutation mutation) {
SortedMap<Key,Value> rowMap = new TreeMap<>();
mutation.getUpdates().forEach(cu -> {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 936d7f1..ca45a05 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -47,6 +47,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
@@ -98,7 +99,8 @@
FutureLocationColumnFamily.NAME,
ClonedColumnFamily.NAME,
ExternalCompactionColumnFamily.NAME,
- UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME
+ UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME,
+ MergedColumnFamily.NAME
);
// @formatter:on
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
index fc15a0b..1ab0c6d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
@@ -33,11 +33,16 @@
*/
WAITING_FOR_OFFLINE,
/**
- * when the number of chopped, offline tablets equals the number of merge tablets, begin the
- * metadata updates
+ * when the number of offline tablets equals the number of merge tablets, begin the metadata
+ * updates
*/
MERGING,
/**
+ * when the operation has finished metadata updates for merge. We can now remove the merged
+ * tablets and clear the MERGED marker. Not used for delete
+ */
+ MERGED,
+ /**
* merge is complete, the resulting tablet can be brought online, remove the marker in zookeeper
*/
COMPLETE
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8142af7..b75c860 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -657,6 +657,7 @@
return TabletGoalState.UNASSIGNED;
}
case MERGING:
+ case MERGED:
return TabletGoalState.UNASSIGNED;
}
} else {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index b99bf34..c4e29ea 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -77,6 +77,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -585,15 +586,28 @@
try {
if (stats.getMergeInfo().isDelete()) {
deleteTablets(stats.getMergeInfo());
+ // For delete we are done and can skip to COMPLETE
+ update = MergeState.COMPLETE;
} else {
mergeMetadataRecords(stats.getMergeInfo());
+ // For merge we need another state to delete the tablets
+ // and clear the marker
+ update = MergeState.MERGED;
}
- update = MergeState.COMPLETE;
manager.setMergeState(stats.getMergeInfo(), update);
} catch (Exception ex) {
Manager.log.error("Unable merge metadata table records", ex);
}
}
+
+ // If the state is MERGED then we are finished with metadata updates
+ if (update == MergeState.MERGED) {
+ // Finish the merge operatoin by deleting the merged tablets and
+ // cleaning up the marker that was used for merge
+ deleteMergedTablets(stats.getMergeInfo());
+ update = MergeState.COMPLETE;
+ manager.setMergeState(stats.getMergeInfo(), update);
+ }
} catch (Exception ex) {
Manager.log.error(
"Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
@@ -601,6 +615,16 @@
}
}
+ // Remove the merged marker from the last tablet in the merge range
+ private void clearMerged(MergeInfo mergeInfo, BatchWriter bw, HighTablet highTablet)
+ throws AccumuloException {
+ Manager.log.debug("Clearing MERGED marker for {}", mergeInfo.getExtent());
+ var m = new Mutation(highTablet.getExtent().toMetaRow());
+ MergedColumnFamily.MERGED_COLUMN.putDelete(m);
+ bw.addMutation(m);
+ bw.flush();
+ }
+
// This method finds returns the deletion starting row (exclusive) for tablets that
// need to be actually deleted. If the startTablet is null then
// the deletion start row will just be null as all tablets are being deleted
@@ -720,7 +744,8 @@
if (extent.endRow() != null) {
Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW);
followingTablet =
- getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow()));
+ getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow()))
+ .getExtent();
Manager.log.debug("Found following tablet {}", followingTablet);
}
try {
@@ -805,7 +830,8 @@
private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
KeyExtent range = info.getExtent();
Manager.log.debug("Merging metadata for {}", range);
- KeyExtent stop = getHighTablet(range);
+ HighTablet highTablet = getHighTablet(range);
+ KeyExtent stop = highTablet.getExtent();
Manager.log.debug("Highest tablet is {}", stop);
Value firstPrevRowValue = null;
Text stopRow = stop.toMetaRow();
@@ -813,8 +839,7 @@
if (start == null) {
start = new Text();
}
- Range scanRange =
- new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false);
+
String targetSystemTable = MetadataTable.NAME;
if (range.isMeta()) {
targetSystemTable = RootTable.NAME;
@@ -826,6 +851,13 @@
KeyExtent previousKeyExtent = null;
KeyExtent lastExtent = null;
+ // Check if we have already previously fenced the tablets
+ if (highTablet.isMerged()) {
+ Manager.log.debug("tablet metadata already fenced for merge {}", range);
+ // Return as we already fenced the files
+ return;
+ }
+
try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
long fileCount = 0;
// Make file entries in highest tablet
@@ -954,28 +986,62 @@
// delete any entries for external compactions
extCompIds.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid));
- if (!m.getUpdates().isEmpty()) {
- bw.addMutation(m);
- }
+ // Add a marker so we know the tablets have been fenced in case the merge operation
+ // needs to be recovered and restarted to finish later.
+ MergedColumnFamily.MERGED_COLUMN.put(m, MergedColumnFamily.MERGED_VALUE);
+ // Add the prev row column update to the same mutation as the
+ // file updates so it will be atomic and only update the prev row
+ // if the tablets were fenced
+ Preconditions.checkState(firstPrevRowValue != null,
+ "Previous row entry for lowest tablet was not found.");
+ stop = new KeyExtent(stop.tableId(), stop.endRow(),
+ TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
+ TabletColumnFamily.PREV_ROW_COLUMN.put(m,
+ TabletColumnFamily.encodePrevEndRow(stop.prevEndRow()));
+ Manager.log.debug("Setting the prevRow for last tablet: {}", stop);
+ bw.addMutation(m);
bw.flush();
Manager.log.debug("Moved {} files to {}", fileCount, stop);
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
- if (firstPrevRowValue == null) {
- Manager.log.debug("tablet already merged");
- return;
- }
+ private void deleteMergedTablets(MergeInfo info) throws AccumuloException {
+ KeyExtent range = info.getExtent();
+ Manager.log.debug("Deleting merged tablets for {}", range);
+ HighTablet highTablet = getHighTablet(range);
+ if (!highTablet.isMerged()) {
+ Manager.log.debug("Tablets have already been deleted for merge with range {}, returning",
+ range);
+ return;
+ }
- stop = new KeyExtent(stop.tableId(), stop.endRow(),
- TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
- Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop);
- Manager.log.debug("Setting the prevRow for last tablet: {}", stop);
- bw.addMutation(updatePrevRow);
- bw.flush();
+ KeyExtent stop = highTablet.getExtent();
+ Manager.log.debug("Highest tablet is {}", stop);
+ Text stopRow = stop.toMetaRow();
+ Text start = range.prevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Range scanRange =
+ new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false);
+ String targetSystemTable = MetadataTable.NAME;
+ if (range.isMeta()) {
+ targetSystemTable = RootTable.NAME;
+ }
+
+ AccumuloClient client = manager.getContext();
+
+ try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
+ // Continue and delete the tablets that were merged
deleteTablets(info, scanRange, bw, client);
+ // Clear the merged marker after we finish deleting tablets
+ clearMerged(info, bw, highTablet);
} catch (Exception ex) {
throw new AccumuloException(ex);
}
@@ -1206,6 +1272,7 @@
// group all deletes into tablet into one mutation, this makes tablets
// either disappear entirely or not all.. this is important for the case
// where the process terminates in the loop below...
+ Manager.log.debug("Inside delete tablets");
scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME,
Authorizations.EMPTY);
Manager.log.debug("Deleting range {}", scanRange);
@@ -1236,24 +1303,47 @@
|| key.getColumnFamily().equals(FutureLocationColumnFamily.NAME);
}
- private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+ private HighTablet getHighTablet(KeyExtent range) throws AccumuloException {
try {
AccumuloClient client = manager.getContext();
Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME,
Authorizations.EMPTY);
TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ MergedColumnFamily.MERGED_COLUMN.fetch(scanner);
KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null);
scanner.setRange(new Range(start.toMetaRow(), null));
Iterator<Entry<Key,Value>> iterator = scanner.iterator();
if (!iterator.hasNext()) {
throw new AccumuloException("No last tablet for a merge " + range);
}
- Entry<Key,Value> entry = iterator.next();
- KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry);
- if (!highTablet.tableId().equals(range.tableId())) {
+
+ KeyExtent highTablet = null;
+ boolean merged = false;
+ Text firstRow = null;
+
+ while (iterator.hasNext()) {
+ Entry<Key,Value> entry = iterator.next();
+ if (firstRow == null) {
+ firstRow = entry.getKey().getRow();
+ }
+ if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
+ Preconditions.checkState(entry.getKey().getRow().equals(firstRow),
+ "Row " + entry.getKey().getRow() + " does not match first row seen " + firstRow);
+ highTablet = KeyExtent.fromMetaPrevRow(entry);
+ Manager.log.debug("found high tablet: {}", entry.getKey());
+ break;
+ } else if (MergedColumnFamily.MERGED_COLUMN.hasColumns(entry.getKey())) {
+ Preconditions.checkState(entry.getKey().getRow().equals(firstRow),
+ "Row " + entry.getKey().getRow() + " does not match first row seen " + firstRow);
+ Manager.log.debug("is merged true: {}", entry.getKey());
+ merged = true;
+ }
+ }
+
+ if (highTablet == null || !highTablet.tableId().equals(range.tableId())) {
throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
}
- return highTablet;
+ return new HighTablet(highTablet, merged);
} catch (Exception ex) {
throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range,
ex);
@@ -1353,4 +1443,22 @@
}
}
+ @VisibleForTesting
+ protected static class HighTablet {
+ private final KeyExtent extent;
+ private final boolean merged;
+
+ public HighTablet(KeyExtent extent, boolean merged) {
+ this.extent = Objects.requireNonNull(extent);
+ this.merged = merged;
+ }
+
+ public boolean isMerged() {
+ return merged;
+ }
+
+ public KeyExtent getExtent() {
+ return extent;
+ }
+ }
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index 0f1bcbe..3f13662 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@ -124,7 +124,7 @@
info.getExtent());
}
}
- if (state == MergeState.MERGING) {
+ if (state == MergeState.MERGING || state == MergeState.MERGED) {
if (hosted != 0) {
// Shouldn't happen
log.error("Unexpected state: hosted tablets should be zero {} merge {}", hosted,
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
index 35026ef..0f693ad 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
@@ -19,9 +19,16 @@
package org.apache.accumulo.manager;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.manager.TabletGroupWatcher.HighTablet;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
public class TabletGroupWatcherTest {
@@ -64,4 +71,17 @@
assertEquals(1, newValues.getSecond().getNumEntries());
assertEquals(original.getTime(), newValues.getSecond().getTime());
}
+
+ @Test
+ public void testHighTablet() {
+ HighTablet mergedTruePrevRowFalse =
+ new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), true);
+ assertNotNull(mergedTruePrevRowFalse.getExtent());
+ assertTrue(mergedTruePrevRowFalse.isMerged());
+
+ HighTablet mergedFalsePrevRowFalse =
+ new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), false);
+ assertNotNull(mergedFalsePrevRowFalse.getExtent());
+ assertFalse(mergedFalsePrevRowFalse.isMerged());
+ }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
index aeee11e..013a1e7 100644
--- a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test;
import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles;
+import static org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -145,21 +146,29 @@
// Merging tablets should produce fenced files because of no-chop merge
assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0);
verifyMetadataTableScan(client);
+ // Verify that the MERGED marker was cleared and doesn't exist on any tablet
+ verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
addSplits(opts, "44 55 66 77 88".split(" "));
checkMetadataSplits(9, opts);
assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0);
verifyMetadataTableScan(client);
+ // Verify that the MERGED marker was cleared and doesn't exist on any tablet
+ verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
opts.merge(MetadataTable.NAME, new Text("5"), new Text("7"));
checkMetadataSplits(6, opts);
assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0);
verifyMetadataTableScan(client);
+ // Verify that the MERGED marker was cleared and doesn't exist on any tablet
+ verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
opts.merge(MetadataTable.NAME, null, null);
checkMetadataSplits(0, opts);
assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0);
verifyMetadataTableScan(client);
+ // Verify that the MERGED marker was cleared and doesn't exist on any tablet
+ verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
opts.compact(MetadataTable.NAME, new CompactionConfig());
// Should be no more fenced files after compaction
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
index c8e8f93..4d265a1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
@@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata;
+import static org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -97,6 +98,9 @@
c.tableOperations().flush(tableName, null, null, true);
c.tableOperations().merge(tableName, new Text("c1"), new Text("f1"));
assertEquals(8, c.tableOperations().listSplits(tableName).size());
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(),
+ TableId.of(c.tableOperations().tableIdMap().get(tableName)));
}
}
@@ -189,6 +193,9 @@
verify(c, 100, 201, tableName);
verifyNoRows(c, 100, 301, tableName);
verify(c, 600, 401, tableName);
+
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(), tableId);
}
}
@@ -251,6 +258,8 @@
c.tableOperations().merge(tableName, null, null);
log.debug("Metadata after Merge");
printAndVerifyFileMetadata(getServerContext(), tableId, 12);
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(), tableId);
// Verify that the deleted rows can't be read after merge
verify(c, 150, 1, tableName);
@@ -332,6 +341,8 @@
c.tableOperations().merge(tableName, null, null);
log.debug("Metadata after second Merge");
printAndVerifyFileMetadata(getServerContext(), tableId, -1);
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(), tableId);
// Verify that the deleted rows can't be read after merge
verify(c, 150, 1, tableName);
@@ -382,6 +393,8 @@
c.tableOperations().merge(tableName, null, null);
log.debug("Metadata after Merge");
printAndVerifyFileMetadata(getServerContext(), tableId, -1);
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(), tableId);
// Verify that the deleted rows can't be read after merge
verify(c, 100, 1, tableName);
@@ -518,16 +531,17 @@
log.debug("Before Merge");
client.tableOperations().flush(table, null, null, true);
- printAndVerifyFileMetadata(getServerContext(),
- TableId.of(client.tableOperations().tableIdMap().get(table)));
+ TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(table));
+ printAndVerifyFileMetadata(getServerContext(), tableId);
client.tableOperations().merge(table, start == null ? null : new Text(start),
end == null ? null : new Text(end));
client.tableOperations().flush(table, null, null, true);
log.debug("After Merge");
- printAndVerifyFileMetadata(getServerContext(),
- TableId.of(client.tableOperations().tableIdMap().get(table)));
+ printAndVerifyFileMetadata(getServerContext(), tableId);
+ // Verify that the MERGED marker was cleared
+ verifyMergedMarkerCleared(getServerContext(), tableId);
try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
diff --git a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
index 8d94ff3..bfc39c9 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashMap;
import java.util.Map;
@@ -169,6 +170,14 @@
ctx.tableOperations().online(tableName, true);
}
+ // Verifies that the MERGED marker was cleared and doesn't exist on any tablet
+ public static void verifyMergedMarkerCleared(final ServerContext ctx, TableId tableId) {
+ try (var tabletsMetadata =
+ ctx.getAmple().readTablets().forTable(tableId).fetch(ColumnType.MERGED).build()) {
+ assertTrue(tabletsMetadata.stream().noneMatch(TabletMetadata::hasMerged));
+ }
+ }
+
public interface FileMutator {
void mutate(TabletMetadata tm, TabletMutator mutator, StoredTabletFile file,
DataFileValue value);