PHOENIX-7515: Add metric for count of Phoenix client batches used by a commit call (#2064)
PHOENIX-7515: Add metric for count of Phoenix client batches used by a commit call
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index c1bb46f..46a8057 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1392,6 +1392,7 @@
List<Mutation> mutationList = pair.getValue();
List<List<Mutation>> mutationBatchList =
getMutationBatchList(batchSize, batchSizeBytes, mutationList);
+ int totalBatchCount = mutationBatchList.size();
// create a span per target table
// TODO maybe we can be smarter about the table name to string here?
@@ -1530,7 +1531,6 @@
// REPLAY_ONLY_INDEX_WRITES for first batch
// only in case of 1121 SQLException
itrListMutation.remove();
-
batchCount++;
if (LOGGER.isDebugEnabled())
LOGGER.debug("Sent batch of " + mutationBatch.size() + " for "
@@ -1624,7 +1624,7 @@
numMutations,
numFailedMutations,
numFailedPhase3Mutations,
- mutationCommitTime);
+ mutationCommitTime, totalBatchCount);
// Combine failure mutation metrics with committed ones for the final picture
committedMutationsMetric.combineMetric(failureMutationMetrics);
mutationMetricQueue.addMetricsForTable(htableNameStr, committedMutationsMetric);
@@ -1724,7 +1724,7 @@
numUpsertMutationsInBatch,
allUpsertsMutations ? 1 : 0,
numDeleteMutationsInBatch,
- allDeletesMutations ? 1 : 0);
+ allDeletesMutations ? 1 : 0, 0);
}
/**
@@ -1743,7 +1743,7 @@
static MutationMetric getCommittedMutationsMetric(
MutationBytes totalMutationBytesObject, List<List<Mutation>> unsentMutationBatchList,
long numMutations, long numFailedMutations,
- long numFailedPhase3Mutations, long mutationCommitTime) {
+ long numFailedPhase3Mutations, long mutationCommitTime, long mutationBatchCounter) {
long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getUpsertMutationBytes();
long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
@@ -1807,7 +1807,7 @@
committedDeleteMutationCounter,
committedTotalMutationBytes,
numFailedPhase3Mutations,
- 0, 0, 0, 0 );
+ 0, 0, 0, 0, mutationBatchCounter);
}
private void filterIndexCheckerMutations(Map<TableInfo, List<Mutation>> mutationMap,
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index d66fb0e..40b7932 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -88,6 +88,9 @@
DELETE_BATCH_FAILED_COUNTER("dbfc", "Number of delete mutation batches that failed to be committed",
LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_BATCH_COUNTER("mbc", "Number of mutation batches committed "
+ + "in a commit call", LogLevel.OFF, PLong.INSTANCE),
+
// select-specific query (read) metrics updated during executeQuery
SELECT_SUCCESS_SQL_COUNTER("sss", "Counter for number of select sql queries that successfully"
+ " passed the executeQuery phase", LogLevel.OFF, PLong.INSTANCE),
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 5a129c0..d42a101 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -23,6 +23,7 @@
import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
@@ -93,7 +94,8 @@
publishedMetricsForTable.put(metric.getUpsertBatchFailedCounter().getMetricType(), metric.getUpsertBatchFailedCounter().getValue());
publishedMetricsForTable.put(metric.getDeleteBatchFailedSize().getMetricType(), metric.getDeleteBatchFailedSize().getValue());
publishedMetricsForTable.put(metric.getDeleteBatchFailedCounter().getMetricType(), metric.getDeleteBatchFailedCounter().getValue());
-
+ publishedMetricsForTable.put(metric.getMutationBatchCounter().getMetricType(),
+ metric.getMutationBatchCounter().getValue());
}
return publishedMetrics;
}
@@ -125,8 +127,11 @@
private final CombinableMetric numOfIndexCommitFailMutations = new CombinableMetricImpl(
INDEX_COMMIT_FAILURE_SIZE);
+ private final CombinableMetric mutationBatchCounter =
+ new CombinableMetricImpl(MUTATION_BATCH_COUNTER);
+
public static final MutationMetric EMPTY_METRIC =
- new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
+ new MutationMetric(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
@@ -134,7 +139,7 @@
long deleteMutationSqlCounterSuccess, long totalMutationBytes,
long numOfPhase3Failed, long upsertBatchFailedSize,
long upsertBatchFailedCounter, long deleteBatchFailedSize,
- long deleteBatchFailedCounter) {
+ long deleteBatchFailedCounter, long mutationBatchCounter) {
this.numMutations.change(numMutations);
this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
@@ -151,6 +156,7 @@
this.upsertBatchFailedCounter.change(upsertBatchFailedCounter);
this.deleteBatchFailedSize.change(deleteBatchFailedSize);
this.deleteBatchFailedCounter.change(deleteBatchFailedCounter);
+ this.mutationBatchCounter.change(mutationBatchCounter);
}
public CombinableMetric getTotalCommitTimeForUpserts() {
@@ -215,6 +221,10 @@
return deleteBatchFailedCounter;
}
+ public CombinableMetric getMutationBatchCounter() {
+ return mutationBatchCounter;
+ }
+
public void combineMetric(MutationMetric other) {
this.numMutations.combine(other.numMutations);
this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
@@ -232,6 +242,7 @@
this.upsertBatchFailedCounter.combine(other.upsertBatchFailedCounter);
this.deleteBatchFailedSize.combine(other.deleteBatchFailedSize);
this.deleteBatchFailedCounter.combine(other.deleteBatchFailedCounter);
+ this.mutationBatchCounter.combine(other.mutationBatchCounter);
}
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index 5a1aa3d..6832775 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -25,6 +25,7 @@
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
@@ -74,9 +75,6 @@
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
-import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
-import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
@@ -140,7 +138,8 @@
TABLE_NUM_SYSTEM_TABLE_RPC_SUCCESS(NUM_SYSTEM_TABLE_RPC_SUCCESS),
TABLE_NUM_SYSTEM_TABLE_RPC_FAILURES(NUM_SYSTEM_TABLE_RPC_FAILURES),
TABLE_NUM_METADATA_LOOKUP_FAILURES(NUM_METADATA_LOOKUP_FAILURES),
- TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS);
+ TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS),
+ TABLE_MUTATION_BATCH_SUCCESS_COUNTER(MUTATION_BATCH_COUNTER);
private MetricType metricType;
private PhoenixTableMetric metric;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 6be65af..5a83bfa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -115,7 +115,7 @@
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been sixteen metrics", 16, p.size());
+ assertEquals("There should have been seventeen metrics", 17, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
@@ -132,6 +132,7 @@
boolean upsertMutationSqlCounterPresent = false;
boolean upsertCommitTimeCounterPresent = false;
boolean deleteCommitTimeCounterPresent = false;
+ boolean mutationBatchCounterPresent = false;
for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
MetricType metricType = metric.getKey();
long metricValue = metric.getValue();
@@ -206,6 +207,11 @@
}
deleteCommitTimeCounterPresent = true;
}
+ else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) {
+ assertTrue("mutation batch success counter should be greater than zero",
+ metricValue > 0);
+ mutationBatchCounterPresent = true;
+ }
}
assertTrue(mutationBatchSizePresent);
assertTrue(mutationCommitTimePresent);
@@ -222,6 +228,7 @@
assertTrue(deleteBatchFailedCounterPresent);
assertTrue(upsertCommitTimeCounterPresent);
assertTrue(deleteCommitTimeCounterPresent);
+ assertTrue(mutationBatchCounterPresent);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 8c007df..f3495fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -41,6 +41,7 @@
import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
@@ -76,7 +77,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -89,7 +89,6 @@
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -133,7 +132,8 @@
private static final String DELETE_ALL_DML = "DELETE FROM %s";
private static final List<MetricType> mutationMetricsToSkip =
- Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, DELETE_COMMIT_TIME);
+ Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, DELETE_COMMIT_TIME,
+ MUTATION_BATCH_COUNTER);
private static final List<MetricType> readMetricsToSkip =
Lists.newArrayList(TASK_QUEUE_WAIT_TIME, TASK_EXECUTION_TIME, TASK_END_TO_END_TIME,
COUNT_MILLS_BETWEEN_NEXTS);
@@ -377,6 +377,29 @@
}
}
+ static void createTableAndRunUpsertSelect(String destTableName, String sourceTableName,
+ boolean resetGlobalMetricsAfterTableCreate,
+ boolean resetTableMetricsAfterTableCreate,
+ boolean commit, Connection conn) throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(String.format(DDL, destTableName));
+ }
+ conn.commit();
+ if (resetGlobalMetricsAfterTableCreate) {
+ resetGlobalMetrics();
+ }
+
+ if (resetTableMetricsAfterTableCreate) {
+ PhoenixRuntime.clearTableLevelMetrics();
+ }
+ try (Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(String.format(UPSERT_SELECT_DML, destTableName, sourceTableName));
+ }
+ if (commit) {
+ conn.commit();
+ }
+ }
+
static void doPointDeleteFromTable(String tableName, Connection conn) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(
String.format(POINT_DELETE_DML, tableName))) {
@@ -487,11 +510,12 @@
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been sixteen metrics", 16, p.size());
+ assertEquals("There should have been seventeen metrics", 17, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
boolean mutationBatchFailedPresent = false;
+ boolean mutationBatchCounterPresent = false;
for (Entry<MetricType, Long> metric : p.entrySet()) {
MetricType metricType = metric.getKey();
long metricValue = metric.getValue();
@@ -508,11 +532,16 @@
assertEquals("Zero failed mutations expected", 0, metricValue);
mutationBatchFailedPresent = true;
}
+ else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) {
+ assertEquals("Mutation batch success count should be greater than zero", 1, metricValue);
+ mutationBatchCounterPresent = true;
+ }
}
assertTrue(mutationBatchSizePresent);
assertTrue(mutationCommitTimePresent);
assertTrue(mutationBytesPresent);
assertTrue(mutationBatchFailedPresent);
+ assertTrue(mutationBatchCounterPresent);
}
Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
assertEquals("Read metrics should be empty", 0, readMetrics.size());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index 0031957..1a1280e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -38,7 +38,6 @@
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.query.QueryServicesTestImpl;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -67,6 +66,7 @@
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
@@ -115,10 +115,13 @@
import static org.apache.phoenix.monitoring.PhoenixMetricsIT.POINT_LOOKUP_SELECT_QUERY;
import static org.apache.phoenix.monitoring.PhoenixMetricsIT.RANGE_SCAN_SELECT_QUERY;
import static org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndInsertValues;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndRunUpsertSelect;
import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doPointDeleteFromTable;
import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doDeleteAllFromTable;
+import static org.apache.phoenix.query.QueryServices.ENABLE_SERVER_UPSERT_SELECT;
import static org.apache.phoenix.util.DelayedOrFailingRegionServer.INJECTED_EXCEPTION_STRING;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.clearTableLevelMetrics;
import static org.apache.phoenix.util.PhoenixRuntime.getOverAllReadRequestMetricInfo;
import static org.apache.phoenix.util.PhoenixRuntime.getPhoenixTableClientMetrics;
@@ -186,6 +189,7 @@
// Add our own driver
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixMetricsTestingDriver.class.getName());
+ props.put(ENABLE_SERVER_UPSERT_SELECT, "true");
initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -331,6 +335,8 @@
* @param writeMutMetrics write mutation metrics object
* @param conn connection object. Note: this method must be called after connection close
* since that's where we populate table-level write metrics
+ * @param expectedMutationBatchCount expected number of mutation batches per commit call
+
*/
private static void assertMutationTableMetrics(final boolean isUpsert, final String tableName,
final long expectedUpsertOrDeleteSuccessSqlCt,
@@ -342,7 +348,7 @@
final long expectedUpsertOrDeleteAggregateSuccessCt,
final long expectedUpsertOrDeleteAggregateFailureCt,
final Map<MetricType, Long> writeMutMetrics, final Connection conn,
- final boolean expectedSystemCatalogMetric)
+ final boolean expectedSystemCatalogMetric, final long expectedMutationBatchCount)
throws SQLException {
assertTrue(conn != null && conn.isClosed());
assertFalse(hasMutationBeenExplicitlyCommitted && writeMutMetrics == null);
@@ -432,6 +438,10 @@
writeMutMetrics.get(isUpsert ?
UPSERT_BATCH_FAILED_COUNTER :
DELETE_BATCH_FAILED_COUNTER), CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ expectedMutationBatchCount, CompareOp.EQ);
}
}
if (expectedSystemCatalogMetric) {
@@ -713,7 +723,7 @@
// Must be asserted after connection close since that's where
// we populate table-level metrics
assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, 0, 1, 0,
- writeMutMetrics, conn, true);
+ writeMutMetrics, conn, true, 100);
}
}
@@ -739,7 +749,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, 0, 1, 0,
- writeMutMetrics, conn, true);
+ writeMutMetrics, conn, true, 1);
}
}
@@ -779,7 +789,7 @@
// mutation commit time since autoCommit was on
assertMutationTableMetrics(true, tableName, numRows, 0,
writeMutMetrics.get(UPSERT_COMMIT_TIME), true, numRows, 0, 0, numRows, 0,
- writeMutMetrics, conn,true);
+ writeMutMetrics, conn,true, 10);
}
}
@@ -814,7 +824,7 @@
}
assertNotNull("Failed to get a connection!", conn);
conn.close();
- assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 0, 1, 0, null, conn, true);
+ assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 0, 1, 0, null, conn, true, 0);
}
}
@@ -854,7 +864,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(true, tableName, numRows, 0, delay, true, numRows, 0, 0, 1,
- 0, writeMutMetrics, conn, true);
+ 0, writeMutMetrics, conn, true, 1);
}
}
@@ -902,7 +912,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(true, tableName, 0, 1, 0, true, 1, 0, 1, 0, 1,
- writeMutMetrics, conn, true);
+ writeMutMetrics, conn, true, 1);
}
}
@@ -953,7 +963,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, 0, numRows, 0,
- 1, writeMutMetrics, conn, true);
+ 1, writeMutMetrics, conn, true, 1);
}
}
@@ -993,7 +1003,70 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, numRows, delayRs, 0, 1,
- 0, writeMutMetrics, conn, true);
+ 0, writeMutMetrics, conn, true, 1);
+ }
+ }
+
+ @Test public void testUpsertSelectWithRunOnServerAsTrue() throws SQLException {
+ String srcTableName = generateUniqueName();
+ String destTableName = generateUniqueName();
+ int numRows = 10;
+ Map<MetricType, Long> writeMutMetrics;
+ try (Connection conn = getConnFromTestDriver()) {
+ createTableAndInsertValues(srcTableName, true, true,
+ numRows, true, conn, false);
+ }
+ try (Connection conn = getConnFromTestDriver()) {
+ conn.setAutoCommit(true); // Set auto-commit to make upsert select run on server
+ createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+ true, true, conn);
+ writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+ }
+ assertNull(writeMutMetrics); // No commits were done from client to server so, no metrics recorded
+ for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(destTableName)) {
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER, 0, CompareOp.EQ);
+ }
+ }
+
+ @Test public void testUpsertSelectWithRunOnServerAsFalse() throws SQLException {
+ String srcTableName = generateUniqueName();
+ String destTableName = generateUniqueName();
+ int numRows = 10;
+ Map<MetricType, Long> writeMutMetrics;
+ try (Connection conn = getConnFromTestDriver()) {
+ createTableAndInsertValues(srcTableName, true, true,
+ numRows, true, conn, false);
+ }
+ try (Connection conn = getConnFromTestDriver()) {
+ createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+ true, true, conn);
+ writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+ }
+ // Rows were fetched to client from source table and committed to destination table on server
+ assertNotNull(writeMutMetrics);
+ for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(destTableName)) {
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, CompareOp.EQ);
+ }
+ }
+
+ @Test public void testUpsertWithOverriddenUpsertBatchSize() throws SQLException {
+ String tableName = generateUniqueName();
+ int numRows = 100;
+ Map<MetricType, Long> writeMutMetrics;
+ Properties props = new Properties();
+ props.put(UPSERT_BATCH_SIZE_ATTRIB, "5");
+ try (Connection conn = DriverManager.getConnection(url, props)) {
+ createTableAndInsertValues(tableName, true, true,
+ numRows, true, conn, false);
+ writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+ }
+ assertNotNull(writeMutMetrics);
+ for (PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(tableName)) {
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER, 20, CompareOp.EQ);
}
}
@@ -1024,7 +1097,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 1, 0, 0, 1, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1055,7 +1128,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, 0, 0, 1, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1092,7 +1165,7 @@
assertNull(writeMutMetrics);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, 0, false, 0, 0, 0, 0, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1136,7 +1209,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
assertNull(writeMutMetrics);
conn.close();
- assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 0, 0, 1, null, conn, false);
+ assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 0, 0, 1, null, conn, false, 0);
}
}
@@ -1175,7 +1248,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, injectDelay, true, 1, 0, 0, 1, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1221,7 +1294,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, 0, numRows, 0, 1,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1246,6 +1319,7 @@
// Insert data into the table
String insertData = "UPSERT INTO " + dataTable + " VALUES (?, ?)";
+ Map<String, Map<MetricType, Long>> writeMutMetrics;
try (Connection conn = getConnFromTestDriver();
PreparedStatement stmt = conn.prepareStatement(insertData)) {
for (int i = 1; i <= 10; i++) {
@@ -1254,6 +1328,21 @@
stmt.executeUpdate();
}
conn.commit();
+ writeMutMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
+ }
+ for(PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(dataTable)) {
+ if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ writeMutMetrics.get(dataTable).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+ }
+ }
+ for(PhoenixTableMetric metric: getPhoenixTableClientMetrics().get(indexName)) {
+ if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER, 2, CompareOp.EQ);
+ assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+ writeMutMetrics.get(indexName).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+ }
}
// Check if the index is being used
@@ -1332,7 +1421,7 @@
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
conn.close();
assertMutationTableMetrics(false, tableName, 1, 0, 0, true, numRows, delayRs, 0, 1, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 1);
}
}
@@ -1374,7 +1463,7 @@
// 1 regular upsert + numAtomicUpserts
// 2 mutations (regular and atomic on the same row in the same batch will be split)
assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
- writeMutMetrics, conn, false);
+ writeMutMetrics, conn, false, 2);
assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
index 54f4c6c..bf92444 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
@@ -236,32 +236,32 @@
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, true);
MutationMetricQueue.MutationMetric metric = new MutationMetricQueue.MutationMetric(
0L, 5L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, true);
metric = new MutationMetricQueue.MutationMetric(0L, 10L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, true);
metric = new MutationMetricQueue.MutationMetric(0L, 50L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 5, true);
metric = new MutationMetricQueue.MutationMetric(0L, 100L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 6, true);
metric = new MutationMetricQueue.MutationMetric(0L, 500L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, true);
metric = new MutationMetricQueue.MutationMetric(0L, 1000L, 0L, 0L, 0L,0L,
- 0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), true);
@@ -300,32 +300,32 @@
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, false);
MutationMetricQueue.MutationMetric metric = new MutationMetricQueue.MutationMetric(
0L, 0L, 5L, 0L, 0L, 0L,
- 0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, false);
metric = new MutationMetricQueue.MutationMetric(0L, 0L, 10L, 0L, 0L, 0L,
- 0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, false);
metric = new MutationMetricQueue.MutationMetric(0L, 0L, 50L, 0L, 0L, 0L,
- 0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 5,false);
metric = new MutationMetricQueue.MutationMetric(0L, 0L, 100L, 0L, 0L, 0L,
- 0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 6,false);
metric = new MutationMetricQueue.MutationMetric(0L, 0L, 500L, 0L, 0L, 0L,
- 0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);
TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, false);
metric = new MutationMetricQueue.MutationMetric(0L, 0L, 1000L, 0L, 0L, 0L,
- 0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L);
+ 0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, metric.getTotalMutationsSizeBytes().getValue(), false);