PHOENIX-5743 addendum for multi-column family indexes
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 65262ec..c26930c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -308,13 +308,13 @@
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
             conn.createStatement().execute("create table " + dataTableName +
-                    " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+                    " (id varchar(10) not null primary key, a.val1 varchar(10), b.val2 varchar(10), c.val3 varchar(10))" + tableDDLOptions);
             String indexTableName = generateUniqueName();
             conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
                     dataTableName + " (val1) include (val2, val3)");
             // Configure IndexRegionObserver to fail the data write phase
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
-            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab','abcde')");
             commitWithException(conn);
             // The above upsert will create an unverified index row
             // Configure IndexRegionObserver to allow the data write phase
@@ -339,7 +339,7 @@
             assertFalse(rs.next());
             // Configure IndexRegionObserver to fail the data write phase
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
-            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab','abcde')");
             commitWithException(conn);
             // The above upsert will create an unverified index row
             // Configure IndexRegionObserver to allow the data write phase
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index a65d63c..9e3b9a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -245,11 +245,13 @@
 
         private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException {
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
-                Delete del = new Delete(indexRowKey, ts);
+                Delete del;
                 if (specific) {
-                    del.addFamilyVersion(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
+                    del = indexMaintainer.buildRowDeleteMutation(indexRowKey,
+                            IndexMaintainer.DeleteType.SINGLE_VERSION, ts);
                 } else {
-                    del.addFamily(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
+                    del = indexMaintainer.buildRowDeleteMutation(indexRowKey,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                 }
                 Mutation[] mutations = new Mutation[]{del};
                 region.batchMutate(mutations);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index a924889..2d8bfc9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1071,7 +1071,7 @@
         return put;
     }
 
-    private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
+    public enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
     private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates) {
         return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs);
     }
@@ -1150,7 +1150,29 @@
         }
         return false;
     }
-    
+
+    public Delete buildRowDeleteMutation(byte[] indexRowKey, DeleteType deleteType, long ts) {
+        byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
+        Delete delete = new Delete(indexRowKey);
+
+        for (ColumnReference ref : getCoveredColumns()) {
+            ColumnReference indexColumn = coveredColumnsMap.get(ref);
+            // If table delete was single version, then index delete should be as well
+            if (deleteType == DeleteType.SINGLE_VERSION) {
+                delete.addFamilyVersion(indexColumn.getFamily(), ts);
+            } else {
+                delete.addFamily(indexColumn.getFamily(), ts);
+            }
+        }
+        if (deleteType == DeleteType.SINGLE_VERSION) {
+            delete.addFamilyVersion(emptyCF, ts);
+        } else {
+            delete.addFamily(emptyCF, ts);
+        }
+        delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+        return delete;
+    }
+
    /**
      * Used for immutable indexes that only index PK column values. In that case, we can handle a data row deletion,
      * since we can build the corresponding index row key.
@@ -1164,25 +1186,7 @@
         // Delete the entire row if any of the indexed columns changed
         DeleteType deleteType = null;
         if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire row
-            byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
-            Delete delete = new Delete(indexRowKey);
-            
-            for (ColumnReference ref : getCoveredColumns()) {
-                ColumnReference indexColumn = coveredColumnsMap.get(ref);
-                // If table delete was single version, then index delete should be as well
-                if (deleteType == DeleteType.SINGLE_VERSION) {
-                    delete.addFamilyVersion(indexColumn.getFamily(), ts);
-                } else {
-                    delete.addFamily(indexColumn.getFamily(), ts);
-                }
-            }
-            if (deleteType == DeleteType.SINGLE_VERSION) {
-                delete.addFamilyVersion(emptyCF, ts);
-            } else {
-                delete.addFamily(emptyCF, ts);
-            }
-            delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
-            return delete;
+            return buildRowDeleteMutation(indexRowKey, deleteType, ts);
         }
         Delete delete = null;
         Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet();