PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc (#1197)

* PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc

Co-authored-by: Rushabh <rushabh.shah@salesforce.com>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 6327e5d..737106f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -47,12 +47,15 @@
 import org.apache.hadoop.hbase.RawCell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.DeleteCompiler;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.end2end.index.IndexTestUtil;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.SQLParser;
@@ -978,7 +981,7 @@
         // Add tag "customer-delete" to delete marker.
         props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
 
-        createAndUpsertTable(tableName, indexName, props);
+        createAndUpsertTable(tableName, indexName, props, false);
         // Make sure that the plan creates is of ClientSelectDeleteMutationPlan
         verifyDeletePlan(delete, DeleteCompiler.ClientSelectDeleteMutationPlan.class, props);
         executeDelete(delete, props, 1);
@@ -1003,7 +1006,7 @@
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
 
-        createAndUpsertTable(tableName, indexName, props);
+        createAndUpsertTable(tableName, indexName, props, false);
         // Make sure that the plan creates is of ServerSelectDeleteMutationPlan
         verifyDeletePlan(delete, DeleteCompiler.ServerSelectDeleteMutationPlan.class, props);
         executeDelete(delete, props, 2);
@@ -1029,7 +1032,7 @@
         props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
         // Don't create index table. We will use MultiRowDeleteMutationPlan
         // if there is no index present for a table.
-        createAndUpsertTable(tableName, null, props);
+        createAndUpsertTable(tableName, null, props, false);
         // Make sure that the plan creates is of MultiRowDeleteMutationPlan
         verifyDeletePlan(delete, DeleteCompiler.MultiRowDeleteMutationPlan.class, props);
         executeDelete(delete, props, 1);
@@ -1054,8 +1057,8 @@
             assertEquals(plan.getClass(), planClass);
         }
     }
-    private void createAndUpsertTable(String tableName, String indexName, Properties props)
-            throws SQLException {
+    private void createAndUpsertTable(String tableName, String indexName, Properties props,
+            boolean useOldCoproc) throws Exception {
         String ddl = "CREATE TABLE " + tableName +
                 " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)";
         try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -1066,6 +1069,11 @@
                     String indexDdl1 = "CREATE INDEX " + indexName + " ON " + tableName + "(v1,v2)";
                     statement.execute(indexDdl1);
                 }
+                if (useOldCoproc) {
+                    Admin admin = ((PhoenixConnection) conn).getQueryServices().getAdmin();
+                    IndexTestUtil.downgradeCoprocs(tableName, indexName, admin);
+                }
+
                 statement.execute(
                         "upsert into " + tableName + " values (1, 'foo', 'foo1')");
                 statement.execute(
@@ -1122,4 +1130,23 @@
             assertEquals(0, tags.size());
         }
     }
+
+    /*
+        Test whether source of operation tags are added to Delete mutations if we are using
+        old index coproc.
+     */
+    @Test
+    public void testDeleteTagsWithOldIndexCoproc() throws Exception {
+        String tableName = generateUniqueName();
+        String tagValue = "customer-delete";
+        String delete = "DELETE FROM " + tableName + " WHERE k = 1";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+        // The new table will always have new index coproc. Downgrade it to use older one.
+        createAndUpsertTable(tableName, null, props, true);
+        executeDelete(delete, props, 1);
+        String startRowKeyForBaseTable = "1";
+        // Make sure that Delete Marker has cell tag for base table.
+        checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index ba8be11..a7c6c6d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -17,13 +17,12 @@
  */
 package org.apache.phoenix.end2end;
 
+import org.apache.phoenix.end2end.index.IndexTestUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.index.IndexCoprocIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.Indexer;
@@ -400,7 +399,7 @@
                         indexDesc.hasCoprocessor(IndexRegionObserver.class.getName()));
                     Assert.assertFalse("Found Indexer on " + table,
                         indexDesc.hasCoprocessor(Indexer.class.getName()));
-                    IndexCoprocIT.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(),
+                    IndexTestUtil.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(),
                         IndexCoprocIT.INDEX_REGION_OBSERVER_CONFIG);
                 }
             }
@@ -410,7 +409,7 @@
                 TableDescriptor indexDesc = admin.getDescriptor(TableName.valueOf(index));
                 Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index,
                     indexDesc.hasCoprocessor(GlobalIndexChecker.class.getName()));
-                IndexCoprocIT.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(),
+                IndexTestUtil.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(),
                     IndexCoprocIT.GLOBAL_INDEX_CHECKER_CONFIG);
             }
         }
@@ -440,7 +439,7 @@
                     indexDesc.hasCoprocessor(Indexer.class.getName()));
                 Assert.assertFalse("Found IndexRegionObserver on " + table,
                     indexDesc.hasCoprocessor(IndexRegionObserver.class.getName()));
-                IndexCoprocIT.assertCoprocConfig(indexDesc, Indexer.class.getName(),
+                IndexTestUtil.assertCoprocConfig(indexDesc, Indexer.class.getName(),
                     IndexCoprocIT.INDEXER_CONFIG);
             }
         }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
index c0a11ff..c71129b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
@@ -23,20 +23,14 @@
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.index.GlobalIndexChecker;
-import org.apache.phoenix.index.PhoenixIndexBuilder;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.SchemaUtil;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -46,10 +40,13 @@
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocConfig;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsContains;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsNotContains;
+import static org.apache.phoenix.end2end.index.IndexTestUtil.assertUsingOldCoprocs;
+
 @RunWith(Parameterized.class)
 public class IndexCoprocIT extends ParallelStatsDisabledIT {
     private boolean isNamespaceMapped = false;
@@ -82,35 +79,15 @@
             isNamespaceMapped).getString();
         String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
             indexName, isNamespaceMapped).getString();
-        Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+        Admin admin = getConnection().getQueryServices().getAdmin();
 
         createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
         createIndexTable(schemaName, tableName, indexName);
 
-        TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
-        TableDescriptorBuilder baseDescBuilder = TableDescriptorBuilder.newBuilder(baseDescriptor);
-        TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
-        TableDescriptorBuilder indexDescBuilder = TableDescriptorBuilder.newBuilder(indexDescriptor);
-
-        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
-        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
-
-        removeCoproc(IndexRegionObserver.class, baseDescBuilder, admin);
-        removeCoproc(IndexRegionObserver.class, indexDescBuilder, admin);
-        removeCoproc(GlobalIndexChecker.class, indexDescBuilder, admin);
-
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-        Indexer.enableIndexing(baseDescBuilder, PhoenixIndexBuilder.class,
-            props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
-        admin.modifyTable(baseDescBuilder.build());
-        baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
-        indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
-        assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
-
+        IndexTestUtil.downgradeCoprocs(physicalTableName, physicalIndexName, admin);
         createBaseTable(schemaName, tableName, true, 0, null);
-        baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
-        indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+        TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+        TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
         assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
     }
 
@@ -150,7 +127,7 @@
             isNamespaceMapped).getString();
         String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
             indexName, isNamespaceMapped).getString();
-        Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+        Admin admin = getConnection().getQueryServices().getAdmin();
 
         createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
         createIndexTable(schemaName, tableName, indexName);
@@ -167,15 +144,6 @@
         assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
         assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
     }
-    private void assertUsingOldCoprocs(TableDescriptor baseDescriptor,
-                                       TableDescriptor indexDescriptor) {
-        assertCoprocsContains(Indexer.class, baseDescriptor);
-        assertCoprocConfig(baseDescriptor, Indexer.class.getName(),
-            INDEXER_CONFIG);
-        assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
-        assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
-        assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
-    }
 
     private void assertUsingNewCoprocs(TableDescriptor baseDescriptor) {
         assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
@@ -194,58 +162,6 @@
             GLOBAL_INDEX_CHECKER_CONFIG);
     }
 
-    private void assertCoprocsContains(Class clazz, TableDescriptor descriptor) {
-        String expectedCoprocName = clazz.getName();
-        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
-        Assert.assertTrue("Could not find coproc " + expectedCoprocName +
-            " in descriptor " + descriptor,foundCoproc);
-    }
-
-    private void assertCoprocsNotContains(Class clazz, TableDescriptor descriptor) {
-        String expectedCoprocName = clazz.getName();
-        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
-        Assert.assertFalse("Could find coproc " + expectedCoprocName +
-            " in descriptor " + descriptor,foundCoproc);
-    }
-
-    private boolean isCoprocPresent(TableDescriptor descriptor, String expectedCoprocName) {
-        boolean foundCoproc = false;
-        for (String coprocName : descriptor.getCoprocessors()){
-            if (coprocName.equals(expectedCoprocName)){
-                foundCoproc = true;
-                break;
-            }
-        }
-        return foundCoproc;
-    }
-
-    private void removeCoproc(Class clazz, TableDescriptorBuilder descBuilder, Admin admin) throws Exception {
-        descBuilder.removeCoprocessor(clazz.getName());
-        admin.modifyTable(descBuilder.build());
-    }
-
-    public static void assertCoprocConfig(TableDescriptor indexDesc,
-                                   String className, String expectedConfigValue){
-        boolean foundConfig = false;
-        for (Map.Entry<Bytes, Bytes> entry :
-            indexDesc.getValues().entrySet()){
-            String propKey = Bytes.toString(entry.getKey().get());
-            String propValue = Bytes.toString(entry.getValue().get());
-            //Unfortunately, a good API to read coproc properties didn't show up until
-            //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x
-            if (propKey.contains("coprocessor")){
-                if (propValue.contains(className)){
-                    Assert.assertEquals(className + " is configured incorrectly",
-                        expectedConfigValue,
-                        propValue);
-                    foundConfig = true;
-                    break;
-                }
-            }
-        }
-        Assert.assertTrue("Couldn't find config for " + className, foundConfig);
-    }
-
     private void createIndexTable(String schemaName, String tableName, String indexName)
         throws SQLException {
         Connection conn = getConnection();
@@ -300,5 +216,4 @@
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
         return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
     }
-
-}
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
index c97847d..bfa9ed6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.phoenix.end2end.index.IndexCoprocIT.INDEXER_CONFIG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
@@ -30,16 +31,28 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
@@ -54,6 +67,7 @@
 import org.apache.phoenix.util.SchemaUtil;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.junit.Assert;
 
 public class IndexTestUtil {
 
@@ -165,4 +179,102 @@
         return row.toRowMutations();
     }
 
-}
+    public static void downgradeCoprocs(String physicalTableName,
+            String physicalIndexName, Admin admin) throws Exception {
+        TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+        TableDescriptorBuilder baseDescBuilder = TableDescriptorBuilder.newBuilder(baseDescriptor);
+
+        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+        removeCoproc(IndexRegionObserver.class, baseDescBuilder, admin);
+
+        if (physicalIndexName != null) {
+            TableDescriptor indexDescriptor =
+                    admin.getDescriptor(TableName.valueOf(physicalIndexName));
+            assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+
+            TableDescriptorBuilder indexDescBuilder =
+                    TableDescriptorBuilder.newBuilder(indexDescriptor);
+            removeCoproc(IndexRegionObserver.class, indexDescBuilder, admin);
+            removeCoproc(GlobalIndexChecker.class, indexDescBuilder, admin);
+        }
+
+        Map<String, String> props = new HashMap<>();
+        props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+        Indexer.enableIndexing(baseDescBuilder, PhoenixIndexBuilder.class,
+                props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
+        admin.modifyTable(baseDescBuilder.build());
+        baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+        TableDescriptor indexDescriptor = null;
+        if (physicalIndexName != null) {
+            indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+        }
+        assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+    }
+
+
+    public static void assertCoprocsContains(Class clazz, TableDescriptor descriptor) {
+        String expectedCoprocName = clazz.getName();
+        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+        Assert.assertTrue("Could not find coproc " + expectedCoprocName +
+                " in descriptor " + descriptor,foundCoproc);
+    }
+
+
+    public static boolean isCoprocPresent(TableDescriptor descriptor, String expectedCoprocName) {
+        boolean foundCoproc = false;
+        for (String coprocName : descriptor.getCoprocessors()){
+            if (coprocName.equals(expectedCoprocName)){
+                foundCoproc = true;
+                break;
+            }
+        }
+        return foundCoproc;
+    }
+
+    public static void removeCoproc(Class clazz, TableDescriptorBuilder descBuilder, Admin admin)
+            throws Exception {
+        descBuilder.removeCoprocessor(clazz.getName());
+        admin.modifyTable(descBuilder.build());
+    }
+
+    public static void assertUsingOldCoprocs(TableDescriptor baseDescriptor,
+            TableDescriptor indexDescriptor) {
+        assertCoprocsContains(Indexer.class, baseDescriptor);
+        assertCoprocConfig(baseDescriptor, Indexer.class.getName(),
+                INDEXER_CONFIG);
+        assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
+        if (indexDescriptor != null) {
+            assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
+            assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
+        }
+    }
+
+    public static void assertCoprocConfig(TableDescriptor indexDesc,
+            String className, String expectedConfigValue){
+        boolean foundConfig = false;
+        for (Map.Entry<Bytes, Bytes> entry :
+                indexDesc.getValues().entrySet()){
+            String propKey = Bytes.toString(entry.getKey().get());
+            String propValue = Bytes.toString(entry.getValue().get());
+            //Unfortunately, a good API to read coproc properties didn't show up until
+            //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x
+            if (propKey.contains("coprocessor")){
+                if (propValue.contains(className)){
+                    Assert.assertEquals(className + " is configured incorrectly",
+                            expectedConfigValue,
+                            propValue);
+                    foundConfig = true;
+                    break;
+                }
+            }
+        }
+        Assert.assertTrue("Couldn't find config for " + className, foundConfig);
+    }
+
+    public static void assertCoprocsNotContains(Class clazz, TableDescriptor descriptor) {
+        String expectedCoprocName = clazz.getName();
+        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+        Assert.assertFalse("Could find coproc " + expectedCoprocName +
+                " in descriptor " + descriptor,foundCoproc);
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 98edf41..628d294 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -30,14 +30,6 @@
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.PhoenixTagType;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.RawCell;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagUtil;
-import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -101,6 +93,7 @@
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -733,7 +726,7 @@
         scan.setFilter(skipScanFilter);
         try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
             boolean more = true;
-            while(more) {
+            while (more) {
                 List<Cell> cells = new ArrayList<Cell>();
                 more = scanner.next(cells);
                 if (cells.isEmpty()) {
@@ -974,7 +967,7 @@
         context.populateOriginalMutations(miniBatchOp);
         // Need to add cell tags to Delete Marker before we do any index processing
         // since we add tags to tables which doesn't have indexes also.
-        setDeleteAttributes(miniBatchOp);
+        IndexUtil.setDeleteAttributes(miniBatchOp);
 
         /*
          * Exclusively lock all rows so we get a consistent read
@@ -1031,49 +1024,6 @@
         }
     }
 
-    /**
-     * Set Cell Tags to delete markers with source of operation attribute.
-     * @param miniBatchOp
-     * @throws IOException
-     */
-    private void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp)
-            throws IOException {
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            Mutation m = miniBatchOp.getOperation(i);
-            if (!(m instanceof  Delete)) {
-                // Ignore if it is not Delete type.
-                continue;
-            }
-            byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
-            if (sourceOpAttr == null) {
-                continue;
-            }
-            Tag sourceOpTag = new ArrayBackedTag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE,
-                    sourceOpAttr);
-            List<Cell> updatedCells = new ArrayList<>();
-            for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
-                Cell cell = cellScanner.current();
-                RawCell rawCell = (RawCell)cell;
-                List<Tag> tags = new ArrayList<>();
-                Iterator<Tag> tagsIterator = rawCell.getTags();
-                while (tagsIterator.hasNext()) {
-                    tags.add(tagsIterator.next());
-                }
-                tags.add(sourceOpTag);
-                // TODO: PrivateCellUtil's IA is Private. HBASE-25328 adds a builder method
-                // TODO: for creating Tag which will be LP with IA.coproc
-                Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
-                updatedCells.add(updatedCell);
-            }
-            m.getFamilyCellMap().clear();
-            // Clear and add new Cells to the Mutation.
-            for (Cell cell : updatedCells) {
-                Delete d = (Delete) m;
-                d.addDeleteMarker(cell);
-            }
-        }
-    }
-
     private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 8b04faf..0501f28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -81,6 +81,7 @@
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -367,12 +368,15 @@
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
 
+    // Need to add cell tags to Delete Marker before we do any index processing
+    // since we add tags to tables which doesn't have indexes also.
+    IndexUtil.setDeleteAttributes(miniBatchOp);
       // first group all the updates for a single row into a single update to be processed
       Map<ImmutableBytesPtr, MultiMutation> mutationsMap =
               new HashMap<ImmutableBytesPtr, MultiMutation>();
           
       Durability defaultDurability = Durability.SYNC_WAL;
-      if(c.getEnvironment().getRegion() != null) {
+      if (c.getEnvironment().getRegion() != null) {
           defaultDurability = c.getEnvironment().getRegion().getTableDescriptor().getDurability();
           defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? 
                   Durability.SYNC_WAL : defaultDurability;
@@ -512,7 +516,7 @@
           byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
           Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
           List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
-          while(indexUpdatesItr.hasNext()) {
+          while (indexUpdatesItr.hasNext()) {
               Pair<Mutation, byte[]> next = indexUpdatesItr.next();
               if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
                   localUpdates.add(next.getFirst());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index c40b5ea..0449c00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -39,6 +39,15 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RawCell;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
 import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hbase.Cell;
@@ -818,4 +827,50 @@
         }
     }
 
+    /**
+     * Set Cell Tags to delete markers with source of operation attribute.
+     * @param miniBatchOp miniBatchOp
+     * @throws IOException IOException
+     */
+    public static void setDeleteAttributes(
+            MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!(m instanceof Delete)) {
+                // Ignore if it is not Delete type.
+                continue;
+            }
+            byte[] sourceOpAttr =
+                    m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
+            if (sourceOpAttr == null) {
+                continue;
+            }
+            Tag sourceOpTag = new ArrayBackedTag(
+                    PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr);
+            List<Cell> updatedCells = new ArrayList<>();
+            for (CellScanner cellScanner = m.cellScanner();
+                 cellScanner.advance();) {
+                Cell cell = cellScanner.current();
+                RawCell rawCell = (RawCell) cell;
+                List<Tag> tags = new ArrayList<>();
+                Iterator<Tag> tagsIterator = rawCell.getTags();
+                while (tagsIterator.hasNext()) {
+                    tags.add(tagsIterator.next());
+                }
+                tags.add(sourceOpTag);
+                // TODO: PrivateCellUtil's IA is Private.
+                // HBASE-25328 adds a builder methodfor creating Tag which
+                // will be LP with IA.coproc
+                Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
+                updatedCells.add(updatedCell);
+            }
+            m.getFamilyCellMap().clear();
+            // Clear and add new Cells to the Mutation.
+            for (Cell cell : updatedCells) {
+                Delete d = (Delete) m;
+                d.addDeleteMarker(cell);
+            }
+        }
+    }
 }