PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b058b33..340832f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -67,6 +69,7 @@
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -145,16 +148,16 @@
private final int clientVersion;
// The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
// the verified column) will have the value false ("unverified") on these mutations
- private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+ private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates;
// The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
// the verified column) will have the value true ("verified") on the put mutations
- private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+ private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
// The collection of candidate index mutations that will be applied after the data table mutations
- private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+ private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
- long dataWriteStartTime;
-
+ private long dataWriteStartTime;
+ private boolean rebuild;
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
}
@@ -506,6 +509,27 @@
}
}
+ private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
+ byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+ List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+ if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+ return;
+ }
+ List<Mutation> localUpdates = new ArrayList<Mutation>();
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+ while (indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ localUpdates.add(next.getFirst());
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+ }
+
private void prepareIndexMutations(
ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -513,79 +537,56 @@
Collection<? extends Mutation> mutations,
long now,
PhoenixIndexMetaData indexMetaData) throws Throwable {
-
List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
-
// get the index updates for all elements in this batch
- Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
- this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
-
+ context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+ this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
current.addTimelineAnnotation("Built index updates, doing preStep");
- TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
- byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
- Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
- List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
- context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
- context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
- while(indexUpdatesItr.hasNext()) {
- Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
- if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
- localUpdates.add(next.getFirst().getFirst());
- indexUpdatesItr.remove();
- }
- else {
- // get index maintainer for this index table
- IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
- if (indexMaintainer == null) {
- throw new DoNotRetryIOException(
- "preBatchMutateWithExceptions: indexMaintainer is null " +
- c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
- }
- byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
- byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+ context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ int updateCount = 0;
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ updateCount++;
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
+ context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+ while (indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
// add the VERIFIED cell, which is the empty cell
- Mutation m = next.getFirst().getFirst();
- boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
- if (rebuild) {
+ Mutation m = next.getFirst();
+ if (context.rebuild) {
+ indexUpdatesItr.remove();
if (m instanceof Put) {
long ts = getMaxTimestamp(m);
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(m, emptyCF, emptyCQ);
- ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+ ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
}
+ context.preIndexUpdates.put(hTableInterfaceReference, m);
} else {
- indexUpdatesItr.remove();
// For this mutation whether it is put or delete, set the status of the index row "unverified"
// This will be done before the data table row is updated (i.e., in the first write phase)
Put unverifiedPut = new Put(m.getRow());
unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
- context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+ context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
if (m instanceof Put) {
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(m, emptyCF, emptyCQ);
((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
}
- context.intermediatePostIndexUpdates.add(next);
}
}
}
- if (!localUpdates.isEmpty()) {
- miniBatchOp.addOperationsFromCP(0,
- localUpdates.toArray(new Mutation[localUpdates.size()]));
- }
- if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
- context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
- }
- for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
- context.preIndexUpdates.add(update.getFirst());
- }
+ TracingUtils.addAnnotation(current, "index update count", updateCount);
}
}
@@ -610,20 +611,23 @@
setBatchMutateContext(c, context);
Mutation firstMutation = miniBatchOp.getOperation(0);
ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+ context.rebuild = replayWrite != null;
/*
* Exclusively lock all rows so we get a consistent read
* while determining the index updates
*/
- if (replayWrite == null) {
+ long now;
+ if (!context.rebuild) {
populateRowsToLock(miniBatchOp, context);
lockRows(context);
- }
- long now = EnvironmentEdgeManager.currentTimeMillis();
- // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
- // concurrent updates
- if (replayWrite == null) {
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+ // concurrent updates
populatePendingRows(context);
}
+ else {
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ }
// First group all the updates for a single row into a single update to be processed
Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
// early exit if it turns out we don't have any edits
@@ -646,9 +650,11 @@
for (RowLock rowLock : context.rowLocks) {
rowLock.release();
}
- // Do the index updates
+ // Do the first phase index updates
doPre(c, context, miniBatchOp);
- if (replayWrite == null) {
+ context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ if (!context.rebuild) {
+ List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
// Acquire the locks again before letting the region proceed with data table updates
List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
for (RowLock rowLock : context.rowLocks) {
@@ -658,29 +664,26 @@
context.rowLocks.clear();
context.rowLocks = rowLocks;
// Check if we need to skip post index update for any of the row
- Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
- while (iterator.hasNext()) {
- // Check if this row is going through another mutation which has a newer timestamp. If so,
- // ignore the pending updates for this row
- Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
- PendingRow pendingRow = pendingRows.get(rowKey);
- // Are there concurrent updates on the data table row? if so, skip post index updates
- // and let read repair resolve conflicts
- if (pendingRow.isConcurrent()) {
- iterator.remove();
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ Iterator<Pair<Mutation, byte[]>> iterator =
+ context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+ while (iterator.hasNext()) {
+ // Are there concurrent updates on the data table row? if so, skip post index updates
+ // and let read repair resolve conflicts
+ Pair<Mutation, byte[]> update = iterator.next();
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (!pendingRow.isConcurrent()) {
+ context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
+ }
}
}
// We are done with handling concurrent mutations. So we can remove the rows of this batch from
// the collection of pending rows
removePendingRows(context);
}
- if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
- context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
- }
- for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
- context.postIndexUpdates.add(update.getFirst());
- }
if (failDataTableUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the data table write failure");
}
@@ -758,7 +761,7 @@
private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
throws IOException {
- Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+ ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
//short circuit, if we don't need to do any work
if (context == null || indexUpdates.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 7639a49..90d28b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
+import com.google.common.collect.ListMultimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
@@ -34,6 +35,8 @@
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@
return this.delegate.getIndexMetaData(miniBatchOp);
}
- public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
+ public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations,
IndexMetaData indexMetaData) throws Throwable {
@@ -87,20 +90,12 @@
this.delegate.batchStarted(miniBatchOp, indexMetaData);
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
- ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
- if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
- for (Pair<Mutation, byte[]> update : updates) {
- update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
- }
- }
for (Pair<Mutation, byte[]> update : updates) {
- results.add(new Pair<>(update, m.getRow()));
+ indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
}
}
- return results;
}
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(