PHOENIX-7515: Add metric for count of Phoenix client batches used by a commit call (#2064)

PHOENIX-7515: Add metric for count of Phoenix client batches used by a commit call
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index c1bb46f..46a8057 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1392,6 +1392,7 @@
             List<Mutation> mutationList = pair.getValue();
             List<List<Mutation>> mutationBatchList =
                     getMutationBatchList(batchSize, batchSizeBytes, mutationList);
+            int totalBatchCount = mutationBatchList.size();
 
             // create a span per target table
             // TODO maybe we can be smarter about the table name to string here?
@@ -1530,7 +1531,6 @@
                         // REPLAY_ONLY_INDEX_WRITES for first batch
                         // only in case of 1121 SQLException
                         itrListMutation.remove();
-
                         batchCount++;
                         if (LOGGER.isDebugEnabled())
                             LOGGER.debug("Sent batch of " + mutationBatch.size() + " for "
@@ -1624,7 +1624,7 @@
                                     numMutations,
                                     numFailedMutations,
                                     numFailedPhase3Mutations,
-                                    mutationCommitTime);
+                                    mutationCommitTime, totalBatchCount);
                     // Combine failure mutation metrics with committed ones for the final picture
                     committedMutationsMetric.combineMetric(failureMutationMetrics);
                     mutationMetricQueue.addMetricsForTable(htableNameStr, committedMutationsMetric);
@@ -1724,7 +1724,7 @@
                 numUpsertMutationsInBatch,
                 allUpsertsMutations ? 1 : 0,
                 numDeleteMutationsInBatch,
-                allDeletesMutations ? 1 : 0);
+                allDeletesMutations ? 1 : 0, 0);
     }
 
     /**
@@ -1743,7 +1743,7 @@
     static MutationMetric getCommittedMutationsMetric(
             MutationBytes totalMutationBytesObject, List<List<Mutation>> unsentMutationBatchList,
             long numMutations, long numFailedMutations,
-            long numFailedPhase3Mutations, long mutationCommitTime) {
+            long numFailedPhase3Mutations, long mutationCommitTime, long mutationBatchCounter) {
         long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getUpsertMutationBytes();
         long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
@@ -1807,7 +1807,7 @@
                 committedDeleteMutationCounter,
                 committedTotalMutationBytes,
                 numFailedPhase3Mutations,
-                0, 0, 0, 0 );
+                0, 0, 0, 0, mutationBatchCounter);
     }
 
     private void filterIndexCheckerMutations(Map<TableInfo, List<Mutation>> mutationMap,
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index d66fb0e..40b7932 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -88,6 +88,9 @@
     DELETE_BATCH_FAILED_COUNTER("dbfc", "Number of delete mutation batches that failed to be committed",
             LogLevel.OFF, PLong.INSTANCE),
 
+    MUTATION_BATCH_COUNTER("mbc", "Number of mutation batches committed "
+            + "in a commit call", LogLevel.OFF, PLong.INSTANCE),
+
     // select-specific query (read) metrics updated during executeQuery
     SELECT_SUCCESS_SQL_COUNTER("sss", "Counter for number of select sql queries that successfully"
             + " passed the executeQuery phase", LogLevel.OFF, PLong.INSTANCE),
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 5a129c0..d42a101 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -23,6 +23,7 @@
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
@@ -93,7 +94,8 @@
             publishedMetricsForTable.put(metric.getUpsertBatchFailedCounter().getMetricType(), metric.getUpsertBatchFailedCounter().getValue());
             publishedMetricsForTable.put(metric.getDeleteBatchFailedSize().getMetricType(), metric.getDeleteBatchFailedSize().getValue());
             publishedMetricsForTable.put(metric.getDeleteBatchFailedCounter().getMetricType(), metric.getDeleteBatchFailedCounter().getValue());
-
+            publishedMetricsForTable.put(metric.getMutationBatchCounter().getMetricType(),
+                    metric.getMutationBatchCounter().getValue());
         }
         return publishedMetrics;
     }
@@ -125,8 +127,11 @@
         private final CombinableMetric numOfIndexCommitFailMutations = new CombinableMetricImpl(
                 INDEX_COMMIT_FAILURE_SIZE);
 
+        private final CombinableMetric mutationBatchCounter =
+                new CombinableMetricImpl(MUTATION_BATCH_COUNTER);
+
         public static final MutationMetric EMPTY_METRIC =
-                new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
+                new MutationMetric(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
                 long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
@@ -134,7 +139,7 @@
                 long deleteMutationSqlCounterSuccess, long totalMutationBytes,
                 long numOfPhase3Failed, long upsertBatchFailedSize,
                 long upsertBatchFailedCounter, long deleteBatchFailedSize,
-                long deleteBatchFailedCounter) {
+                long deleteBatchFailedCounter, long mutationBatchCounter) {
             this.numMutations.change(numMutations);
             this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
             this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
@@ -151,6 +156,7 @@
             this.upsertBatchFailedCounter.change(upsertBatchFailedCounter);
             this.deleteBatchFailedSize.change(deleteBatchFailedSize);
             this.deleteBatchFailedCounter.change(deleteBatchFailedCounter);
+            this.mutationBatchCounter.change(mutationBatchCounter);
         }
 
         public CombinableMetric getTotalCommitTimeForUpserts() {
@@ -215,6 +221,10 @@
             return deleteBatchFailedCounter;
         }
 
+        public CombinableMetric getMutationBatchCounter() {
+            return mutationBatchCounter;
+        }
+
         public void combineMetric(MutationMetric other) {
             this.numMutations.combine(other.numMutations);
             this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
@@ -232,6 +242,7 @@
             this.upsertBatchFailedCounter.combine(other.upsertBatchFailedCounter);
             this.deleteBatchFailedSize.combine(other.deleteBatchFailedSize);
             this.deleteBatchFailedCounter.combine(other.deleteBatchFailedCounter);
+            this.mutationBatchCounter.combine(other.mutationBatchCounter);
         }
 
     }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index 5a1aa3d..6832775 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -25,6 +25,7 @@
 
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
 import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
@@ -74,9 +75,6 @@
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
-import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
-import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
 import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
@@ -140,7 +138,8 @@
                 TABLE_NUM_SYSTEM_TABLE_RPC_SUCCESS(NUM_SYSTEM_TABLE_RPC_SUCCESS),
                 TABLE_NUM_SYSTEM_TABLE_RPC_FAILURES(NUM_SYSTEM_TABLE_RPC_FAILURES),
                 TABLE_NUM_METADATA_LOOKUP_FAILURES(NUM_METADATA_LOOKUP_FAILURES),
-                TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS);
+                TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS),
+                TABLE_MUTATION_BATCH_SUCCESS_COUNTER(MUTATION_BATCH_COUNTER);
 
         private MetricType metricType;
         private PhoenixTableMetric metric;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 6be65af..5a83bfa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -115,7 +115,7 @@
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
 
-            assertEquals("There should have been sixteen metrics", 16, p.size());
+            assertEquals("There should have been seventeen metrics", 17, p.size());
 
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
@@ -132,6 +132,7 @@
             boolean upsertMutationSqlCounterPresent = false;
             boolean upsertCommitTimeCounterPresent = false;
             boolean deleteCommitTimeCounterPresent = false;
+            boolean mutationBatchCounterPresent = false;
             for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
                 MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
@@ -206,6 +207,11 @@
                     }
                     deleteCommitTimeCounterPresent = true;
                 }
+                else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) {
+                    assertTrue("mutation batch success counter should be greater than zero",
+                            metricValue > 0);
+                    mutationBatchCounterPresent = true;
+                }
             }
             assertTrue(mutationBatchSizePresent);
             assertTrue(mutationCommitTimePresent);
@@ -222,6 +228,7 @@
             assertTrue(deleteBatchFailedCounterPresent);
             assertTrue(upsertCommitTimeCounterPresent);
             assertTrue(deleteCommitTimeCounterPresent);
+            assertTrue(mutationBatchCounterPresent);
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 8c007df..f3495fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -41,6 +41,7 @@
 import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
@@ -76,7 +77,6 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -89,7 +89,6 @@
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -133,7 +132,8 @@
     private static final String DELETE_ALL_DML = "DELETE FROM %s";
 
     private static final List<MetricType> mutationMetricsToSkip =
-            Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, DELETE_COMMIT_TIME);
+            Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, DELETE_COMMIT_TIME,
+                    MUTATION_BATCH_COUNTER);
     private static final List<MetricType> readMetricsToSkip =
             Lists.newArrayList(TASK_QUEUE_WAIT_TIME, TASK_EXECUTION_TIME, TASK_END_TO_END_TIME,
                     COUNT_MILLS_BETWEEN_NEXTS);
@@ -377,6 +377,29 @@
         }
     }
 
+    static void createTableAndRunUpsertSelect(String destTableName, String sourceTableName,
+                                              boolean resetGlobalMetricsAfterTableCreate,
+                                              boolean resetTableMetricsAfterTableCreate,
+                                              boolean commit, Connection conn) throws SQLException {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format(DDL, destTableName));
+        }
+        conn.commit();
+        if (resetGlobalMetricsAfterTableCreate) {
+            resetGlobalMetrics();
+        }
+
+        if (resetTableMetricsAfterTableCreate) {
+            PhoenixRuntime.clearTableLevelMetrics();
+        }
+        try (Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate(String.format(UPSERT_SELECT_DML, destTableName, sourceTableName));
+        }
+        if (commit) {
+            conn.commit();
+        }
+    }
+
     static void doPointDeleteFromTable(String tableName, Connection conn) throws SQLException {
         try (PreparedStatement stmt = conn.prepareStatement(
                 String.format(POINT_DELETE_DML, tableName))) {
@@ -487,11 +510,12 @@
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been sixteen metrics", 16, p.size());
+            assertEquals("There should have been seventeen metrics", 17, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
             boolean mutationBatchFailedPresent = false;
+            boolean mutationBatchCounterPresent = false;
             for (Entry<MetricType, Long> metric : p.entrySet()) {
                 MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
@@ -508,11 +532,16 @@
                     assertEquals("Zero failed mutations expected", 0, metricValue);
                     mutationBatchFailedPresent = true;
                 }
+                else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) {
+                    assertEquals("Mutation batch success count should be greater than zero", 1, metricValue);
+                    mutationBatchCounterPresent = true;
+                }
             }
             assertTrue(mutationBatchSizePresent);
             assertTrue(mutationCommitTimePresent);
             assertTrue(mutationBytesPresent);
             assertTrue(mutationBatchFailedPresent);
+            assertTrue(mutationBatchCounterPresent);
         }
         Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         assertEquals("Read metrics should be empty", 0, readMetrics.size());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index 0031957..1a1280e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -38,7 +38,6 @@
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.query.QueryServicesTestImpl;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -67,6 +66,7 @@
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
@@ -115,10 +115,13 @@
 import static org.apache.phoenix.monitoring.PhoenixMetricsIT.POINT_LOOKUP_SELECT_QUERY;
 import static org.apache.phoenix.monitoring.PhoenixMetricsIT.RANGE_SCAN_SELECT_QUERY;
 import static org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndInsertValues;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndRunUpsertSelect;
 import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doPointDeleteFromTable;
 import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doDeleteAllFromTable;
+import static org.apache.phoenix.query.QueryServices.ENABLE_SERVER_UPSERT_SELECT;
 import static org.apache.phoenix.util.DelayedOrFailingRegionServer.INJECTED_EXCEPTION_STRING;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.clearTableLevelMetrics;
 import static org.apache.phoenix.util.PhoenixRuntime.getOverAllReadRequestMetricInfo;
 import static org.apache.phoenix.util.PhoenixRuntime.getPhoenixTableClientMetrics;
@@ -186,6 +189,7 @@
         // Add our own driver
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixMetricsTestingDriver.class.getName());
+        props.put(ENABLE_SERVER_UPSERT_SELECT, "true");
         initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -331,6 +335,8 @@
      * @param writeMutMetrics                       write mutation metrics object
      * @param conn                                  connection object. Note: this method must be called after connection close
      *                                              since that's where we populate table-level write metrics
+     * @param expectedMutationBatchCount            expected number of mutation batches per commit call
+
      */
     private static void assertMutationTableMetrics(final boolean isUpsert, final String tableName,
             final long expectedUpsertOrDeleteSuccessSqlCt,
@@ -342,7 +348,7 @@
             final long expectedUpsertOrDeleteAggregateSuccessCt,
             final long expectedUpsertOrDeleteAggregateFailureCt,
             final Map<MetricType, Long> writeMutMetrics, final Connection conn,
-            final boolean expectedSystemCatalogMetric)
+            final boolean expectedSystemCatalogMetric, final long expectedMutationBatchCount)
             throws SQLException {
         assertTrue(conn != null && conn.isClosed());
         assertFalse(hasMutationBeenExplicitlyCommitted && writeMutMetrics == null);
@@ -432,6 +438,10 @@
                         writeMutMetrics.get(isUpsert ?
                                 UPSERT_BATCH_FAILED_COUNTER :
                                 DELETE_BATCH_FAILED_COUNTER), CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        expectedMutationBatchCount, CompareOp.EQ);
             }
         }
         if (expectedSystemCatalogMetric) {
@@ -713,7 +723,7 @@
             // Must be asserted after connection close since that's where
             // we populate table-level metrics
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 100);
         }
     }
 
@@ -739,7 +749,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -779,7 +789,7 @@
             // mutation commit time since autoCommit was on
             assertMutationTableMetrics(true, tableName, numRows, 0,
                     writeMutMetrics.get(UPSERT_COMMIT_TIME), true, numRows, 0, 0, numRows, 0,
-                    writeMutMetrics, conn,true);
+                    writeMutMetrics, conn,true, 10);
         }
     }
 
@@ -814,7 +824,7 @@
             }
             assertNotNull("Failed to get a connection!", conn);
             conn.close();
-            assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 0, 1, 0, null, conn, true);
+            assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 0, 1, 0, null, conn, true, 0);
         }
     }
 
@@ -854,7 +864,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, delay, true, numRows, 0, 0, 1,
-                    0, writeMutMetrics, conn, true);
+                    0, writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -902,7 +912,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, 0, 1, 0, true, 1, 0, 1, 0, 1,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -953,7 +963,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, numRows, 0,
-                    1, writeMutMetrics, conn, true);
+                    1, writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -993,7 +1003,70 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, delayRs, 0, 1,
-                    0, writeMutMetrics, conn, true);
+                    0, writeMutMetrics, conn, true, 1);
+        }
+    }
+
+    @Test public void testUpsertSelectWithRunOnServerAsTrue() throws SQLException {
+        String srcTableName = generateUniqueName();
+        String destTableName = generateUniqueName();
+        int numRows = 10;
+        Map<MetricType, Long> writeMutMetrics;
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndInsertValues(srcTableName, true, true,
+                    numRows, true, conn, false);
+        }
+        try (Connection conn = getConnFromTestDriver()) {
+            conn.setAutoCommit(true); // Set auto-commit to make upsert select run on server
+            createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+                    true, true, conn);
+            writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+        }
+        assertNull(writeMutMetrics); // No commits were done from client to server so, no metrics recorded
+        for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(destTableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 0, CompareOp.EQ);
+        }
+    }
+
+    @Test public void testUpsertSelectWithRunOnServerAsFalse() throws SQLException {
+        String srcTableName = generateUniqueName();
+        String destTableName = generateUniqueName();
+        int numRows = 10;
+        Map<MetricType, Long> writeMutMetrics;
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndInsertValues(srcTableName, true, true,
+                    numRows, true, conn, false);
+        }
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+                    true, true, conn);
+            writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+        }
+        // Rows were fetched to client from source table and committed to destination table on server
+        assertNotNull(writeMutMetrics);
+        for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(destTableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                    writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, CompareOp.EQ);
+        }
+    }
+
+    @Test public void testUpsertWithOverriddenUpsertBatchSize() throws SQLException {
+        String tableName = generateUniqueName();
+        int numRows = 100;
+        Map<MetricType, Long> writeMutMetrics;
+        Properties props = new Properties();
+        props.put(UPSERT_BATCH_SIZE_ATTRIB, "5");
+        try (Connection conn = DriverManager.getConnection(url, props)) {
+            createTableAndInsertValues(tableName, true, true,
+                    numRows, true, conn, false);
+            writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+        }
+        assertNotNull(writeMutMetrics);
+        for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(tableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                    writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 20, CompareOp.EQ);
         }
     }
 
@@ -1024,7 +1097,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 1, 0, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1055,7 +1128,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1092,7 +1165,7 @@
             assertNull(writeMutMetrics);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, false, 0, 0, 0, 0, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1136,7 +1209,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             assertNull(writeMutMetrics);
             conn.close();
-            assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 0, 0, 1, null, conn, false);
+            assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 0, 0, 1, null, conn, false, 0);
         }
     }
 
@@ -1175,7 +1248,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, injectDelay, true, 1, 0, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1221,7 +1294,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, 0, numRows, 0, 1,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1246,6 +1319,7 @@
 
         // Insert data into the table
         String insertData = "UPSERT INTO " + dataTable + " VALUES (?, ?)";
+        Map<String, Map<MetricType, Long>> writeMutMetrics;
         try (Connection conn = getConnFromTestDriver();
              PreparedStatement stmt = conn.prepareStatement(insertData)) {
             for (int i = 1; i <= 10; i++) {
@@ -1254,6 +1328,21 @@
                 stmt.executeUpdate();
             }
             conn.commit();
+            writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
+        }
+        for(PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(dataTable)) {
+            if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        writeMutMetrics.get(dataTable).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            }
+        }
+        for(PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(indexName)) {
+            if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER, 2, CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        writeMutMetrics.get(indexName).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            }
         }
 
         // Check if the index is being used
@@ -1332,7 +1421,7 @@
                     getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, delayRs, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1374,7 +1463,7 @@
             // 1 regular upsert + numAtomicUpserts
             // 2 mutations (regular and atomic on the same row in the same batch will be split)
             assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
-                writeMutMetrics, conn, false);
+                writeMutMetrics, conn, false, 2);
             assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
             assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
         }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
index 54f4c6c..bf92444 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
@@ -236,32 +236,32 @@
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, true);
         MutationMetricQueue.MutationMetric metric = new MutationMetricQueue.MutationMetric(
                 0L, 5L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, true);
         metric = new MutationMetricQueue.MutationMetric(0L, 10L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, true);
         metric = new MutationMetricQueue.MutationMetric(0L, 50L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 5, true);
         metric = new MutationMetricQueue.MutationMetric(0L, 100L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 6, true);
         metric = new MutationMetricQueue.MutationMetric(0L, 500L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, true);
         metric = new MutationMetricQueue.MutationMetric(0L, 1000L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
 
 
@@ -300,32 +300,32 @@
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, false);
         MutationMetricQueue.MutationMetric metric = new MutationMetricQueue.MutationMetric(
                 0L, 0L, 5L, 0L, 0L, 0L,
-                0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 10L, 0L, 0L, 0L,
-                0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 50L, 0L, 0L, 0L,
-                0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 5,false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 100L, 0L, 0L, 0L,
-                0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 6,false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 500L, 0L, 0L, 0L,
-                0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 1000L, 0L, 0L, 0L,
-                0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);