PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc (#1197)
* PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc
Co-authored-by: Rushabh <rushabh.shah@salesforce.com>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 6327e5d..737106f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -47,12 +47,15 @@
import org.apache.hadoop.hbase.RawCell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.DeleteCompiler;
import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.end2end.index.IndexTestUtil;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.SQLParser;
@@ -978,7 +981,7 @@
// Add tag "customer-delete" to delete marker.
props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
- createAndUpsertTable(tableName, indexName, props);
+ createAndUpsertTable(tableName, indexName, props, false);
// Make sure that the plan creates is of ClientSelectDeleteMutationPlan
verifyDeletePlan(delete, DeleteCompiler.ClientSelectDeleteMutationPlan.class, props);
executeDelete(delete, props, 1);
@@ -1003,7 +1006,7 @@
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
- createAndUpsertTable(tableName, indexName, props);
+ createAndUpsertTable(tableName, indexName, props, false);
// Make sure that the plan creates is of ServerSelectDeleteMutationPlan
verifyDeletePlan(delete, DeleteCompiler.ServerSelectDeleteMutationPlan.class, props);
executeDelete(delete, props, 2);
@@ -1029,7 +1032,7 @@
props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
// Don't create index table. We will use MultiRowDeleteMutationPlan
// if there is no index present for a table.
- createAndUpsertTable(tableName, null, props);
+ createAndUpsertTable(tableName, null, props, false);
// Make sure that the plan creates is of MultiRowDeleteMutationPlan
verifyDeletePlan(delete, DeleteCompiler.MultiRowDeleteMutationPlan.class, props);
executeDelete(delete, props, 1);
@@ -1054,8 +1057,8 @@
assertEquals(plan.getClass(), planClass);
}
}
- private void createAndUpsertTable(String tableName, String indexName, Properties props)
- throws SQLException {
+ private void createAndUpsertTable(String tableName, String indexName, Properties props,
+ boolean useOldCoproc) throws Exception {
String ddl = "CREATE TABLE " + tableName +
" (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)";
try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -1066,6 +1069,11 @@
String indexDdl1 = "CREATE INDEX " + indexName + " ON " + tableName + "(v1,v2)";
statement.execute(indexDdl1);
}
+ if (useOldCoproc) {
+ Admin admin = ((PhoenixConnection) conn).getQueryServices().getAdmin();
+ IndexTestUtil.downgradeCoprocs(tableName, indexName, admin);
+ }
+
statement.execute(
"upsert into " + tableName + " values (1, 'foo', 'foo1')");
statement.execute(
@@ -1122,4 +1130,23 @@
assertEquals(0, tags.size());
}
}
+
+ /*
+ Test whether source of operation tags are added to Delete mutations if we are using
+ old index coproc.
+ */
+ @Test
+ public void testDeleteTagsWithOldIndexCoproc() throws Exception {
+ String tableName = generateUniqueName();
+ String tagValue = "customer-delete";
+ String delete = "DELETE FROM " + tableName + " WHERE k = 1";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+ // The new table will always have new index coproc. Downgrade it to use older one.
+ createAndUpsertTable(tableName, null, props, true);
+ executeDelete(delete, props, 1);
+ String startRowKeyForBaseTable = "1";
+ // Make sure that Delete Marker has cell tag for base table.
+ checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index ba8be11..a7c6c6d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -17,13 +17,12 @@
*/
package org.apache.phoenix.end2end;
+import org.apache.phoenix.end2end.index.IndexTestUtil;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.index.IndexCoprocIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
@@ -400,7 +399,7 @@
indexDesc.hasCoprocessor(IndexRegionObserver.class.getName()));
Assert.assertFalse("Found Indexer on " + table,
indexDesc.hasCoprocessor(Indexer.class.getName()));
- IndexCoprocIT.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(),
+ IndexTestUtil.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(),
IndexCoprocIT.INDEX_REGION_OBSERVER_CONFIG);
}
}
@@ -410,7 +409,7 @@
TableDescriptor indexDesc = admin.getDescriptor(TableName.valueOf(index));
Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index,
indexDesc.hasCoprocessor(GlobalIndexChecker.class.getName()));
- IndexCoprocIT.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(),
+ IndexTestUtil.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(),
IndexCoprocIT.GLOBAL_INDEX_CHECKER_CONFIG);
}
}
@@ -440,7 +439,7 @@
indexDesc.hasCoprocessor(Indexer.class.getName()));
Assert.assertFalse("Found IndexRegionObserver on " + table,
indexDesc.hasCoprocessor(IndexRegionObserver.class.getName()));
- IndexCoprocIT.assertCoprocConfig(indexDesc, Indexer.class.getName(),
+ IndexTestUtil.assertCoprocConfig(indexDesc, Indexer.class.getName(),
IndexCoprocIT.INDEXER_CONFIG);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
index c0a11ff..c71129b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
@@ -23,20 +23,14 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.index.GlobalIndexChecker;
-import org.apache.phoenix.index.PhoenixIndexBuilder;
-import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.SchemaUtil;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -46,10 +40,13 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocConfig;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsContains;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsNotContains;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertUsingOldCoprocs;
+
@RunWith(Parameterized.class)
public class IndexCoprocIT extends ParallelStatsDisabledIT {
private boolean isNamespaceMapped = false;
@@ -82,35 +79,15 @@
isNamespaceMapped).getString();
String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
indexName, isNamespaceMapped).getString();
- Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+ Admin admin = getConnection().getQueryServices().getAdmin();
createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
createIndexTable(schemaName, tableName, indexName);
- TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
- TableDescriptorBuilder baseDescBuilder = TableDescriptorBuilder.newBuilder(baseDescriptor);
- TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
- TableDescriptorBuilder indexDescBuilder = TableDescriptorBuilder.newBuilder(indexDescriptor);
-
- assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
- assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
-
- removeCoproc(IndexRegionObserver.class, baseDescBuilder, admin);
- removeCoproc(IndexRegionObserver.class, indexDescBuilder, admin);
- removeCoproc(GlobalIndexChecker.class, indexDescBuilder, admin);
-
- Map<String, String> props = new HashMap<String, String>();
- props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(baseDescBuilder, PhoenixIndexBuilder.class,
- props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
- admin.modifyTable(baseDescBuilder.build());
- baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
- indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
- assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
-
+ IndexTestUtil.downgradeCoprocs(physicalTableName, physicalIndexName, admin);
createBaseTable(schemaName, tableName, true, 0, null);
- baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
- indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
}
@@ -150,7 +127,7 @@
isNamespaceMapped).getString();
String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
indexName, isNamespaceMapped).getString();
- Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+ Admin admin = getConnection().getQueryServices().getAdmin();
createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
createIndexTable(schemaName, tableName, indexName);
@@ -167,15 +144,6 @@
assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
}
- private void assertUsingOldCoprocs(TableDescriptor baseDescriptor,
- TableDescriptor indexDescriptor) {
- assertCoprocsContains(Indexer.class, baseDescriptor);
- assertCoprocConfig(baseDescriptor, Indexer.class.getName(),
- INDEXER_CONFIG);
- assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
- assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
- assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
- }
private void assertUsingNewCoprocs(TableDescriptor baseDescriptor) {
assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
@@ -194,58 +162,6 @@
GLOBAL_INDEX_CHECKER_CONFIG);
}
- private void assertCoprocsContains(Class clazz, TableDescriptor descriptor) {
- String expectedCoprocName = clazz.getName();
- boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
- Assert.assertTrue("Could not find coproc " + expectedCoprocName +
- " in descriptor " + descriptor,foundCoproc);
- }
-
- private void assertCoprocsNotContains(Class clazz, TableDescriptor descriptor) {
- String expectedCoprocName = clazz.getName();
- boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
- Assert.assertFalse("Could find coproc " + expectedCoprocName +
- " in descriptor " + descriptor,foundCoproc);
- }
-
- private boolean isCoprocPresent(TableDescriptor descriptor, String expectedCoprocName) {
- boolean foundCoproc = false;
- for (String coprocName : descriptor.getCoprocessors()){
- if (coprocName.equals(expectedCoprocName)){
- foundCoproc = true;
- break;
- }
- }
- return foundCoproc;
- }
-
- private void removeCoproc(Class clazz, TableDescriptorBuilder descBuilder, Admin admin) throws Exception {
- descBuilder.removeCoprocessor(clazz.getName());
- admin.modifyTable(descBuilder.build());
- }
-
- public static void assertCoprocConfig(TableDescriptor indexDesc,
- String className, String expectedConfigValue){
- boolean foundConfig = false;
- for (Map.Entry<Bytes, Bytes> entry :
- indexDesc.getValues().entrySet()){
- String propKey = Bytes.toString(entry.getKey().get());
- String propValue = Bytes.toString(entry.getValue().get());
- //Unfortunately, a good API to read coproc properties didn't show up until
- //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x
- if (propKey.contains("coprocessor")){
- if (propValue.contains(className)){
- Assert.assertEquals(className + " is configured incorrectly",
- expectedConfigValue,
- propValue);
- foundConfig = true;
- break;
- }
- }
- }
- Assert.assertTrue("Couldn't find config for " + className, foundConfig);
- }
-
private void createIndexTable(String schemaName, String tableName, String indexName)
throws SQLException {
Connection conn = getConnection();
@@ -300,5 +216,4 @@
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
}
-
-}
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
index c97847d..bfa9ed6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
+import static org.apache.phoenix.end2end.index.IndexCoprocIT.INDEXER_CONFIG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
@@ -30,16 +31,28 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.BitSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
@@ -54,6 +67,7 @@
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.junit.Assert;
public class IndexTestUtil {
@@ -165,4 +179,102 @@
return row.toRowMutations();
}
-}
+ public static void downgradeCoprocs(String physicalTableName,
+ String physicalIndexName, Admin admin) throws Exception {
+ TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ TableDescriptorBuilder baseDescBuilder = TableDescriptorBuilder.newBuilder(baseDescriptor);
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ removeCoproc(IndexRegionObserver.class, baseDescBuilder, admin);
+
+ if (physicalIndexName != null) {
+ TableDescriptor indexDescriptor =
+ admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+
+ TableDescriptorBuilder indexDescBuilder =
+ TableDescriptorBuilder.newBuilder(indexDescriptor);
+ removeCoproc(IndexRegionObserver.class, indexDescBuilder, admin);
+ removeCoproc(GlobalIndexChecker.class, indexDescBuilder, admin);
+ }
+
+ Map<String, String> props = new HashMap<>();
+ props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(baseDescBuilder, PhoenixIndexBuilder.class,
+ props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
+ admin.modifyTable(baseDescBuilder.build());
+ baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ TableDescriptor indexDescriptor = null;
+ if (physicalIndexName != null) {
+ indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ }
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+ }
+
+
+ public static void assertCoprocsContains(Class clazz, TableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertTrue("Could not find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+
+ public static boolean isCoprocPresent(TableDescriptor descriptor, String expectedCoprocName) {
+ boolean foundCoproc = false;
+ for (String coprocName : descriptor.getCoprocessors()){
+ if (coprocName.equals(expectedCoprocName)){
+ foundCoproc = true;
+ break;
+ }
+ }
+ return foundCoproc;
+ }
+
+ public static void removeCoproc(Class clazz, TableDescriptorBuilder descBuilder, Admin admin)
+ throws Exception {
+ descBuilder.removeCoprocessor(clazz.getName());
+ admin.modifyTable(descBuilder.build());
+ }
+
+ public static void assertUsingOldCoprocs(TableDescriptor baseDescriptor,
+ TableDescriptor indexDescriptor) {
+ assertCoprocsContains(Indexer.class, baseDescriptor);
+ assertCoprocConfig(baseDescriptor, Indexer.class.getName(),
+ INDEXER_CONFIG);
+ assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
+ if (indexDescriptor != null) {
+ assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
+ assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+ }
+
+ public static void assertCoprocConfig(TableDescriptor indexDesc,
+ String className, String expectedConfigValue){
+ boolean foundConfig = false;
+ for (Map.Entry<Bytes, Bytes> entry :
+ indexDesc.getValues().entrySet()){
+ String propKey = Bytes.toString(entry.getKey().get());
+ String propValue = Bytes.toString(entry.getValue().get());
+ //Unfortunately, a good API to read coproc properties didn't show up until
+ //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x
+ if (propKey.contains("coprocessor")){
+ if (propValue.contains(className)){
+ Assert.assertEquals(className + " is configured incorrectly",
+ expectedConfigValue,
+ propValue);
+ foundConfig = true;
+ break;
+ }
+ }
+ }
+ Assert.assertTrue("Couldn't find config for " + className, foundConfig);
+ }
+
+ public static void assertCoprocsNotContains(Class clazz, TableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertFalse("Could find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 98edf41..628d294 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -30,14 +30,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.PhoenixTagType;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.RawCell;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagUtil;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -101,6 +93,7 @@
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -733,7 +726,7 @@
scan.setFilter(skipScanFilter);
try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
boolean more = true;
- while(more) {
+ while (more) {
List<Cell> cells = new ArrayList<Cell>();
more = scanner.next(cells);
if (cells.isEmpty()) {
@@ -974,7 +967,7 @@
context.populateOriginalMutations(miniBatchOp);
// Need to add cell tags to Delete Marker before we do any index processing
// since we add tags to tables which doesn't have indexes also.
- setDeleteAttributes(miniBatchOp);
+ IndexUtil.setDeleteAttributes(miniBatchOp);
/*
* Exclusively lock all rows so we get a consistent read
@@ -1031,49 +1024,6 @@
}
}
- /**
- * Set Cell Tags to delete markers with source of operation attribute.
- * @param miniBatchOp
- * @throws IOException
- */
- private void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp)
- throws IOException {
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- if (!(m instanceof Delete)) {
- // Ignore if it is not Delete type.
- continue;
- }
- byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
- if (sourceOpAttr == null) {
- continue;
- }
- Tag sourceOpTag = new ArrayBackedTag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE,
- sourceOpAttr);
- List<Cell> updatedCells = new ArrayList<>();
- for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
- Cell cell = cellScanner.current();
- RawCell rawCell = (RawCell)cell;
- List<Tag> tags = new ArrayList<>();
- Iterator<Tag> tagsIterator = rawCell.getTags();
- while (tagsIterator.hasNext()) {
- tags.add(tagsIterator.next());
- }
- tags.add(sourceOpTag);
- // TODO: PrivateCellUtil's IA is Private. HBASE-25328 adds a builder method
- // TODO: for creating Tag which will be LP with IA.coproc
- Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
- updatedCells.add(updatedCell);
- }
- m.getFamilyCellMap().clear();
- // Clear and add new Cells to the Mutation.
- for (Cell cell : updatedCells) {
- Delete d = (Delete) m;
- d.addDeleteMarker(cell);
- }
- }
- }
-
private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
this.batchMutateContext.set(context);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 8b04faf..0501f28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -81,6 +81,7 @@
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -367,12 +368,15 @@
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+ // Need to add cell tags to Delete Marker before we do any index processing
+ // since we add tags to tables which doesn't have indexes also.
+ IndexUtil.setDeleteAttributes(miniBatchOp);
// first group all the updates for a single row into a single update to be processed
Map<ImmutableBytesPtr, MultiMutation> mutationsMap =
new HashMap<ImmutableBytesPtr, MultiMutation>();
Durability defaultDurability = Durability.SYNC_WAL;
- if(c.getEnvironment().getRegion() != null) {
+ if (c.getEnvironment().getRegion() != null) {
defaultDurability = c.getEnvironment().getRegion().getTableDescriptor().getDurability();
defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ?
Durability.SYNC_WAL : defaultDurability;
@@ -512,7 +516,7 @@
byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
- while(indexUpdatesItr.hasNext()) {
+ while (indexUpdatesItr.hasNext()) {
Pair<Mutation, byte[]> next = indexUpdatesItr.next();
if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
localUpdates.add(next.getFirst());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index c40b5ea..0449c00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -39,6 +39,15 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RawCell;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hbase.Cell;
@@ -818,4 +827,50 @@
}
}
+ /**
+ * Set Cell Tags to delete markers with source of operation attribute.
+ * @param miniBatchOp miniBatchOp
+ * @throws IOException IOException
+ */
+ public static void setDeleteAttributes(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp)
+ throws IOException {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!(m instanceof Delete)) {
+ // Ignore if it is not Delete type.
+ continue;
+ }
+ byte[] sourceOpAttr =
+ m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
+ if (sourceOpAttr == null) {
+ continue;
+ }
+ Tag sourceOpTag = new ArrayBackedTag(
+ PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr);
+ List<Cell> updatedCells = new ArrayList<>();
+ for (CellScanner cellScanner = m.cellScanner();
+ cellScanner.advance();) {
+ Cell cell = cellScanner.current();
+ RawCell rawCell = (RawCell) cell;
+ List<Tag> tags = new ArrayList<>();
+ Iterator<Tag> tagsIterator = rawCell.getTags();
+ while (tagsIterator.hasNext()) {
+ tags.add(tagsIterator.next());
+ }
+ tags.add(sourceOpTag);
+ // TODO: PrivateCellUtil's IA is Private.
+ // HBASE-25328 adds a builder methodfor creating Tag which
+ // will be LP with IA.coproc
+ Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
+ updatedCells.add(updatedCell);
+ }
+ m.getFamilyCellMap().clear();
+ // Clear and add new Cells to the Mutation.
+ for (Cell cell : updatedCells) {
+ Delete d = (Delete) m;
+ d.addDeleteMarker(cell);
+ }
+ }
+ }
}