blob: 04d125a6488cb913204ca735dc08db5ef9dfbab6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.apache.phoenix.monitoring;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
private static final List<String> mutationMetricsToSkip = Lists
.newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
private static final String CUSTOM_URL_STRING = "SESSION";
private static final AtomicInteger numConnections = new AtomicInteger(0);
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
// Enable request metric collection at the driver level
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
// disable renewing leases as this will force spooling to happen.
props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
// need the non-test driver for some tests that check number of hconnections, etc.
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
}
@Test
public void testResetGlobalPhoenixMetrics() {
resetGlobalMetrics();
for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
assertEquals(0, m.getTotalSum());
assertEquals(0, m.getNumberOfSamples());
}
}
@Test
public void testGlobalPhoenixMetricsForQueries() throws Exception {
String tableName = generateUniqueName();
createTableAndInsertValues(tableName, true);
resetGlobalMetrics(); // we want to count metrics related only to the below query
Connection conn = DriverManager.getConnection(getUrl());
String query = "SELECT * FROM " + tableName;
ResultSet rs = conn.createStatement().executeQuery(query);
while (rs.next()) {
rs.getString(1);
rs.getString(2);
}
assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
}
@Test
public void testGlobalPhoenixMetricsForMutations() throws Exception {
String tableName = generateUniqueName();
createTableAndInsertValues(tableName, true);
assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
}
@Test
public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
String tableFrom = generateUniqueName();
String tableTo = generateUniqueName();
createTableAndInsertValues(tableFrom, true);
resetGlobalMetrics();
String ddl = "CREATE TABLE " + tableTo + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(ddl);
resetGlobalMetrics();
String dml = "UPSERT INTO " + tableTo + " (K, V) SELECT K, V FROM " + tableFrom;
conn.createStatement().executeUpdate(dml);
conn.commit();
assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum());
assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
}
private static void resetGlobalMetrics() {
for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
m.reset();
}
}
private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
throws Exception {
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(ddl);
if (resetGlobalMetricsAfterTableCreate) {
resetGlobalMetrics();
}
// executing 10 upserts/mutations.
String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(dml);
for (int i = 1; i <= 10; i++) {
stmt.setString(1, "key" + i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
conn.commit();
}
@Test
public void testOverallQueryMetricsForSelect() throws Exception {
String tableName = generateUniqueName();
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(ddl);
}
@Test
public void testReadMetricsForSelect() throws Exception {
String tableName = generateUniqueName();
long numSaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ numSaltBuckets;
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(ddl);
long numRows = 1000;
long numExpectedTasks = numSaltBuckets;
insertRowsInTable(tableName, numRows);
String query = "SELECT * FROM " + tableName;
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class);
changeInternalStateForTesting(resultSetBeingTested);
while (resultSetBeingTested.next()) {}
resultSetBeingTested.close();
Set<String> expectedTableNames = Sets.newHashSet(tableName);
assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
resultSetBeingTested, expectedTableNames);
}
@Test
public void testMetricsForUpsert() throws Exception {
String tableName = generateUniqueName();
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
Connection conn = insertRowsInTable(tableName, numRows);
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<String, Long> p = entry.getValue();
assertEquals("There should have been three metrics", 3, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
for (Entry<String, Long> metric : p.entrySet()) {
String metricName = metric.getKey();
long metricValue = metric.getValue();
if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
mutationBatchSizePresent = true;
} else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
mutationCommitTimePresent = true;
} else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
mutationBytesPresent = true;
}
}
assertTrue(mutationBatchSizePresent);
assertTrue(mutationCommitTimePresent);
assertTrue(mutationBytesPresent);
}
Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
assertEquals("Read metrics should be empty", 0, readMetrics.size());
}
@Test
public void testMetricsForUpsertSelect() throws Exception {
String tableName1 = generateUniqueName();
long table1SaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ table1SaltBuckets;
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
insertRowsInTable(tableName1, numRows);
String tableName2 = generateUniqueName();
ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
Connection conn = DriverManager.getConnection(getUrl());
String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
conn.createStatement().executeUpdate(upsertSelect);
conn.commit();
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
assertMutationMetrics(tableName2, numRows, mutationMetrics);
Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
}
@Test
public void testMetricsForDelete() throws Exception {
String tableName = generateUniqueName();
long tableSaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ tableSaltBuckets;
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
insertRowsInTable(tableName, numRows);
Connection conn = DriverManager.getConnection(getUrl());
String delete = "DELETE FROM " + tableName;
conn.createStatement().execute(delete);
conn.commit();
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
assertMutationMetrics(tableName, numRows, mutationMetrics);
Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
}
@Test
public void testNoMetricsCollectedForConnection() throws Exception {
String tableName = generateUniqueName();
long tableSaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ tableSaltBuckets;
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
insertRowsInTable(tableName, numRows);
Properties props = new Properties();
props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
Connection conn = DriverManager.getConnection(getUrl(), props);
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
while (rs.next()) {}
rs.close();
Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
conn.commit();
Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
}
@Test
public void testMetricsForUpsertWithAutoCommit() throws Exception {
String tableName = generateUniqueName();
long tableSaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ tableSaltBuckets;
try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
ddlConn.createStatement().execute(ddl);
}
String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
int numRows = 10;
Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
upsertRows(upsert, numRows, conn);
conn.commit();
mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
}
// Insert rows now with auto-commit on
Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(true);
upsertRows(upsert, numRows, conn);
mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
}
// Verify that the mutation metrics are same for both cases
assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
}
private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException {
PreparedStatement stmt = conn.prepareStatement(upsert);
for (int i = 1; i <= numRows; i++) {
stmt.setString(1, "key" + i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
}
@Test
public void testMetricsForDeleteWithAutoCommit() throws Exception {
String tableName = generateUniqueName();
long tableSaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ tableSaltBuckets;
try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
ddlConn.createStatement().execute(ddl);
}
String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
int numRows = 10;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
upsertRows(upsert, numRows, conn);
conn.commit();
}
String delete = "DELETE FROM " + tableName;
// Delete rows now with auto-commit off
Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
conn.createStatement().executeUpdate(delete);
deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
}
// Upsert the rows back
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
upsertRows(upsert, numRows, conn);
conn.commit();
}
// Now delete rows with auto-commit on
Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(true);
conn.createStatement().executeUpdate(delete);
deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
}
// Verify that the mutation metrics are same for both cases.
assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip);
}
@Test
public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
String tableName1 = generateUniqueName();
long table1SaltBuckets = 6;
String ddl = "CREATE TABLE " + tableName1 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+ " SALT_BUCKETS = " + table1SaltBuckets + ", IMMUTABLE_ROWS = true";
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
String dml = "UPSERT INTO " + tableName1 + " VALUES (?, ?)";
try (Connection conn = DriverManager.getConnection(getUrl())) {
PreparedStatement stmt = conn.prepareStatement(dml);
for (int i = 1; i <= numRows; i++) {
stmt.setLong(1, i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
conn.commit();
}
String tableName2 = generateUniqueName();
ddl = "CREATE TABLE " + tableName2 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+ " SALT_BUCKETS = 10" + ", IMMUTABLE_ROWS = true";
ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
String indexName = generateUniqueName();
ddl = "CREATE INDEX " + indexName + " ON " + tableName2 + " (V)";
ddlConn.createStatement().execute(ddl);
ddlConn.close();
String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
conn.createStatement().executeUpdate(upsertSelect);
conn.commit();
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
}
Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
Properties props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().executeUpdate(upsertSelect);
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
}
assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize
props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().executeUpdate(upsertSelect);
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
}
assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
autoCommitBatchSize = numRows;
props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().executeUpdate(upsertSelect);
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
}
assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
autoCommitBatchSize = 2; // multiple batches of equal size
props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().executeUpdate(upsertSelect);
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
}
assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
}
@Test
public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String table1 = generateUniqueName();
createTableAndInsertValues(true, 10, conn, table1);
String table2 = generateUniqueName();
createTableAndInsertValues(true, 10, conn, table2);
String table3 = generateUniqueName();
createTableAndInsertValues(true, 10, conn, table3);
Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip);
assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip);
}
}
@Test
public void testClosingConnectionClearsMetrics() throws Exception {
Connection conn = null;
try {
conn = DriverManager.getConnection(getUrl());
createTableAndInsertValues(true, 10, conn, generateUniqueName());
assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
} finally {
if (conn != null) {
conn.close();
assertTrue("Closing connection didn't clear metrics",
PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
}
}
}
@Test
public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception {
String dataTable = generateUniqueName();
String tableDdl = "CREATE TABLE "
+ dataTable
+ " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true";
String index1 = generateUniqueName() + "_IDX";
String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)";
String index2 = generateUniqueName() + "_IDX";
String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)";
String index3 = generateUniqueName() + "_IDX";
String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)";
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(tableDdl);
conn.createStatement().execute(index1Ddl);
conn.createStatement().execute(index2Ddl);
conn.createStatement().execute(index3Ddl);
}
String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)";
try (Connection conn = DriverManager.getConnection(getUrl())) {
/*
* Upsert data into table. Because the table is immutable, mutations for updating the indices on it are
* handled by the client itself. So mutation metrics should include mutations for the indices as well as the
* data table.
*/
PreparedStatement stmt = conn.prepareStatement(upsert);
for (int i = 1; i < 10; i++) {
stmt.setString(1, "key1" + i);
stmt.setString(2, "key2" + i);
stmt.setInt(3, i);
stmt.setInt(4, i);
stmt.setInt(5, i);
stmt.executeUpdate();
}
conn.commit();
Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
assertTrue(metrics.get(dataTable).size() > 0);
assertTrue(metrics.get(index1).size() > 0);
assertTrue(metrics.get(index2).size() > 0);
assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip);
assertTrue(metrics.get(index3).size() > 0);
assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
}
}
@Test
public void testOpenConnectionsCounter() throws Exception {
long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
try (Connection conn = DriverManager.getConnection(getUrl())) {
assertEquals(numOpenConnections + 1, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
}
assertEquals(numOpenConnections, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
}
private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
throws SQLException {
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
conn.createStatement().execute(ddl);
// executing 10 upserts/mutations.
String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(dml);
for (int i = 1; i <= numRows; i++) {
stmt.setString(1, "key" + i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
if (commit) {
conn.commit();
}
}
private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
List<String> metricsToSkip) {
assertTrue("The two metrics have different or unequal number of table names ",
metric1.keySet().equals(metric2.keySet()));
for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
Map<String, Long> metricNameValueMap1 = entry.getValue();
Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
}
}
private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
.equals(metricNameValueMap2.keySet()));
for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
String metricName = entry.getKey();
if (!metricsToSkip.contains(metricName)) {
assertEquals("Unequal values for metric " + metricName, entry.getValue(),
metricNameValueMap2.get(metricName));
}
}
}
private void changeInternalStateForTesting(PhoenixResultSet rs) {
// get and set the internal state for testing purposes.
ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
}
private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
int counter = 0;
for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
String tableName = entry.getKey();
expectedTableNames.remove(tableName);
Map<String, Long> metricValues = entry.getValue();
boolean scanMetricsPresent = false;
boolean taskCounterMetricsPresent = false;
boolean taskExecutionTimeMetricsPresent = false;
boolean memoryMetricsPresent = false;
for (Entry<String, Long> pair : metricValues.entrySet()) {
String metricName = pair.getKey();
long metricValue = pair.getValue();
long n = numRows.get(counter);
long numTask = numExpectedTasks.get(counter);
if (metricName.equals(SCAN_BYTES.name())) {
// we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
assertEquals(n, metricValue);
scanMetricsPresent = true;
} else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
assertEquals(numTask, metricValue);
taskCounterMetricsPresent = true;
} else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
taskExecutionTimeMetricsPresent = true;
} else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
memoryMetricsPresent = true;
}
}
counter++;
assertTrue(scanMetricsPresent);
assertTrue(taskCounterMetricsPresent);
assertTrue(taskExecutionTimeMetricsPresent);
assertTrue(memoryMetricsPresent);
}
PhoenixRuntime.resetMetrics(resultSetBeingTested);
assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames),
expectedTableNames.size() == 0);
}
private Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
Connection conn = DriverManager.getConnection(getUrl());
PreparedStatement stmt = conn.prepareStatement(dml);
for (int i = 1; i <= numRows; i++) {
stmt.setString(1, "key" + i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
conn.commit();
return conn;
}
// number of records read should be number of bytes at the end
public static final long SCAN_BYTES_DELTA = 1;
// total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
public static final long TASK_EXECUTION_TIME_DELTA = 10;
// total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
public static final long MEMORY_CHUNK_BYTES_DELTA = 100;
private class TestReadMetricsQueue extends ReadMetricQueue {
public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
super(isRequestMetricsEnabled);
}
@Override
public CombinableMetric getMetric(MetricType type) {
switch (type) {
case SCAN_BYTES:
return new CombinableMetricImpl(type) {
@Override
public void change(long delta) {
super.change(SCAN_BYTES_DELTA);
}
};
case TASK_EXECUTION_TIME:
return new CombinableMetricImpl(type) {
@Override
public void change(long delta) {
super.change(TASK_EXECUTION_TIME_DELTA);
}
};
case MEMORY_CHUNK_BYTES:
return new CombinableMetricImpl(type) {
@Override
public void change(long delta) {
super.change(MEMORY_CHUNK_BYTES_DELTA);
}
};
}
return super.getMetric(type);
}
}
private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
Map<String, Map<String, Long>> readMetrics) {
assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
int numTables = 0;
for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
String t = entry.getKey();
assertEquals("Table name didn't match for read metrics", tableName, t);
numTables++;
Map<String, Long> p = entry.getValue();
assertTrue("No read metrics present when there should have been", p.size() > 0);
for (Entry<String, Long> metric : p.entrySet()) {
String metricName = metric.getKey();
long metricValue = metric.getValue();
if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
assertEquals(tableSaltBuckets, metricValue);
} else if (metricName.equals(SCAN_BYTES.name())) {
assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
}
}
}
assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
}
private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
String t = entry.getKey();
assertEquals("Table name didn't match for mutation metrics", tableName, t);
Map<String, Long> p = entry.getValue();
assertEquals("There should have been three metrics", 3, p.size());
for (Entry<String, Long> metric : p.entrySet()) {
String metricName = metric.getKey();
long metricValue = metric.getValue();
if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
} else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
} else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
}
}
}
}
@Test
public void testGetConnectionsForSameUrlConcurrently() throws Exception {
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
ExecutorService exec = Executors.newFixedThreadPool(10);
try {
GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
List<Callable<Connection>> callables = new ArrayList<>(100);
List<Future<Connection>> futures = new ArrayList<>(100);
int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
for (int i = 1; i <= 100; i++) {
Callable<Connection> c = new GetConnectionCallable(url);
callables.add(c);
futures.add(exec.submit(c));
}
for (int i = 0; i < futures.size(); i++) {
Connection c = futures.get(i).get();
try {
c.close();
} catch (Exception ignore) {}
}
assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
} finally {
exec.shutdownNow();
}
}
@Test
public void testGetConnectionsThrottledForSameUrl() throws Exception {
int expectedPhoenixConnections = 11;
List<Connection> connections = Lists.newArrayList();
String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum +
':' + CUSTOM_URL_STRING + '=' + "throttletest";
Properties props = new Properties();
props.setProperty(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "10");
GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().reset();
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().reset();
boolean wasThrottled = false;
try {
for (int k = 0; k < expectedPhoenixConnections; k++) {
connections.add(DriverManager.getConnection(url, props));
}
} catch (SQLException se) {
wasThrottled = true;
assertEquals(SQLExceptionCode.NEW_CONNECTION_THROTTLED.getErrorCode(), se.getErrorCode());
} finally {
for (Connection c : connections) {
c.close();
}
}
assertEquals(1, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
assertTrue("No connection was throttled!", wasThrottled);
assertEquals(1, GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue());
assertEquals(expectedPhoenixConnections, GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().getValue());
}
@Test
public void testGetConnectionsForDifferentTenantsConcurrently() throws Exception {
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
ExecutorService exec = Executors.newFixedThreadPool(10);
try {
GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
List<Callable<Connection>> callables = new ArrayList<>(100);
List<Future<Connection>> futures = new ArrayList<>(100);
for (int i = 1; i <= 100; i++) {
String tenantUrl = url + ';' + TENANT_ID_ATTRIB + '=' + i;
Callable<Connection> c = new GetConnectionCallable(tenantUrl + ";");
callables.add(c);
futures.add(exec.submit(c));
}
for (int i = 0; i < futures.size(); i++) {
Connection c = futures.get(i).get();
try {
c.close();
} catch (Exception ignore) {}
}
assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
} finally {
exec.shutdownNow();
}
}
@Test
public void testGetConnectionsWithDifferentJDBCParamsConcurrently() throws Exception {
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
ExecutorService exec = Executors.newFixedThreadPool(4);
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
String baseUrl = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
int numConnections = 20;
List<Callable<Connection>> callables = new ArrayList<>(numConnections);
List<Future<Connection>> futures = new ArrayList<>(numConnections);
try {
GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
for (int i = 1; i <= numConnections; i++) {
String customUrl = baseUrl + ':' + CUSTOM_URL_STRING + '=' + i;
Callable<Connection> c = new GetConnectionCallable(customUrl + ";");
callables.add(c);
futures.add(exec.submit(c));
}
for (int i = 0; i < futures.size(); i++) {
futures.get(i).get();
}
assertEquals(numConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
assertEquals(numConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
} finally {
exec.shutdownNow();
for (int i = 0; i < futures.size(); i++) {
try {
Connection c = futures.get(i).get();
// close the query services instance because we created a lot of HConnections.
c.unwrap(PhoenixConnection.class).getQueryServices().close();
c.close();
} catch (Exception ignore) {}
}
}
}
private static class GetConnectionCallable implements Callable<Connection> {
private final String url;
GetConnectionCallable(String url) {
this.url = url;
}
@Override
public Connection call() throws Exception {
Connection c = DriverManager.getConnection(url);
if (!url.contains(CUSTOM_URL_STRING)) {
// check to detect whether a connection was established using the PhoenixDriver
// This is used in our tests to figure out whether a new hconnection and query
// services will be created.
numConnections.incrementAndGet();
}
return c;
}
}
}