PHOENIX-5618 IndexScrutinyTool fix for array type columns

Signed-off-by: s.kadam <s.kadam@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index c6f5418..13df56d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -163,6 +164,51 @@
         assertEquals(numIndexRows, countRows(conn, indexTableFullName));
     }
 
+    @Test public void testScrutinyOnArrayTypes() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexTableName = generateUniqueName();
+        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, VB VARBINARY)";
+        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
+        String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
+        String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
+
+        try (Connection conn =
+                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
+            conn.createStatement().execute(String.format(indexTableDDL, indexTableName, dataTableName));
+            // insert two rows
+            PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData, dataTableName));
+            upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
+            upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+            // Now insert a different varbinary row
+            upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
+            conn.commit();
+
+            PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex, indexTableName));
+            upsertIndexStmt.setString(1, "name-3");
+            upsertIndexStmt.setInt(2, 3);
+            upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
+            upsertIndexStmt.executeUpdate();
+            conn.commit();
+
+            completedJobs = runScrutiny(null, dataTableName, indexTableName);
+            job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+    }
+
     /**
      * Tests running a scrutiny while updates and deletes are happening.
      * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
@@ -643,6 +689,15 @@
         stmt.executeUpdate();
     }
 
+    private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws SQLException {
+        int index = 1;
+        // insert row
+        stmt.setInt(index++, id);
+        stmt.setString(index++, name);
+        stmt.setBytes(index++, val);
+        stmt.executeUpdate();
+    }
+
     private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
         String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
         PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index da6e6e1..eceb658 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -371,7 +371,16 @@
         for (int i = startIndex; i < sourceValues.size(); i++) {
             Object targetValue = targetValues.get(i);
             Object sourceValue = sourceValues.get(i);
-            if (targetValue != null && !targetValue.equals(sourceValue)) {
+            if (targetValue != null) {
+                if (sourceValue.getClass().isArray()) {
+                    if (compareArrayTypes(sourceValue, targetValue)) {
+                        continue;
+                    }
+                } else {
+                    if (targetValue.equals(sourceValue)) {
+                        continue;
+                    }
+                }
                 context.getCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT)
                         .increment(1);
                 return false;
@@ -380,6 +389,27 @@
         return true;
     }
 
+    private boolean compareArrayTypes(Object sourceValue, Object targetValue) {
+        if (sourceValue.getClass().getComponentType().equals(byte.class)) {
+            return Arrays.equals((byte[]) sourceValue, (byte[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(char.class)) {
+            return Arrays.equals((char[]) sourceValue, (char[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(boolean.class)) {
+            return Arrays.equals((boolean[]) sourceValue, (boolean[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(double.class)) {
+            return Arrays.equals((double[]) sourceValue, (double[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(int.class)) {
+            return Arrays.equals((int[]) sourceValue, (int[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(short.class)) {
+            return Arrays.equals((short[]) sourceValue, (short[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(long.class)) {
+            return Arrays.equals((long[]) sourceValue, (long[]) targetValue);
+        } else if (sourceValue.getClass().getComponentType().equals(float.class)) {
+            return Arrays.equals((float[]) sourceValue, (float[]) targetValue);
+        }
+        return false;
+    }
+
     private String getPkHash(List<Object> pkObjects) {
         try {
             for (int i = 0; i < pkObjects.size(); i++) {