PHOENIX-5634: Use 'phoenix.default.update.cache.frequency' from connection properties at query time
Signed-off-by: Xinyi Yan <yanxinyi@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
new file mode 100644
index 0000000..1bc109e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.SchemaUtil;
+import static org.apache.phoenix.util.TestUtil.DEFAULT_SCHEMA_NAME;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for connection level 'Update Cache Frequency' property.
+ *
+ * The tests verify that the 'Update Cache Frequency' is honored in the following precedence order
+ * for SELECTs and UPSERTs:
+ * Table-level property > Connection-level property > Default value
+ */
+public class UpdateCacheConnectionLevelPropIT extends ParallelStatsDisabledIT {
+
+ private static Connection conn1;
+ private static Connection conn2;
+ private static ConnectionQueryServices spyForConn2;
+
+ @AfterClass
+ public static synchronized void freeResources() {
+ try {
+ conn1.close();
+ conn2.close();
+ } catch (Exception e) {
+ /* ignored */
+ }
+ }
+
+ /**
+ * Test 'Update Cache Frequency' property when it is set at connection-level only, and not at
+ * table-level.
+ */
+ @Test
+ public void testWithConnLevelUCFNoTableLevelUCF() throws Exception {
+ final long connUpdateCacheFrequency = 1000;
+ String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+ generateUniqueName();
+
+ setUpTableAndConnections(fullTableName, null,
+ String.valueOf(connUpdateCacheFrequency));
+
+ // There should only be a single call to getTable() for fetching the table's metadata
+ int numExecutions = 2;
+ int numExpectedGetTableCalls = 1;
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+ // Wait for a period of 'connUpdateCacheFrequency' and verify that there was one new call to
+ // getTable() for fetching the table's metadata
+ Thread.sleep(connUpdateCacheFrequency);
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+ }
+
+ /**
+ * Test 'Update Cache Frequency' property when it is set at table-level only, and not at
+ * connection-level.
+ */
+ @Test
+ public void testWithTableLevelUCFNoConnLevelUCF() throws Exception {
+ final long tableUpdateCacheFrequency = 1000;
+ String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+ generateUniqueName();
+
+ // There should only be a single call to getTable() for fetching the table's metadata
+ int numExecutions = 2;
+ int numExpectedGetTableCalls = 1;
+ setUpTableAndConnections(fullTableName, String.valueOf(tableUpdateCacheFrequency), null);
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+ // Wait for a period of 'tableUpdateCacheFrequency' and verify that there was one new call
+ // to getTable() for fetching the table's metadata
+ Thread.sleep(tableUpdateCacheFrequency);
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+ }
+
+ /**
+ * Test 'Update Cache Frequency' property when it is not set at both table-level and
+ * connection-level.
+ */
+ @Test
+ public void testWithNoConnAndTableLevelUCF() throws Exception {
+ String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+ generateUniqueName();
+
+ // This is the default behavior (i.e. always fetch the latest metadata of the table) when
+ // both connection and table level properties are not set
+ int numExecutions = 2;
+ int numExpectedGetTableCalls = 4; // 2 for SELECTs, and 2 for UPSERTs
+ setUpTableAndConnections(fullTableName, null, null);
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+ }
+
+ /**
+ * Test 'Update Cache Frequency' property when it is set at both table-level and
+ * connection-level.
+ */
+ @Test
+ public void testWithBothConnAndTableLevelUCF() throws Exception {
+ // Set table level property to a much higher value than the connection level property
+ final long tableUpdateCacheFrequency = 5000;
+ final long connUpdateCacheFrequency = 1000;
+ String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+ generateUniqueName();
+
+ // There should only be a single call to getTable() for fetching the table's metadata
+ int numExecutions = 2;
+ int numExpectedGetTableCalls = 1;
+ setUpTableAndConnections(fullTableName, String.valueOf(tableUpdateCacheFrequency),
+ String.valueOf(connUpdateCacheFrequency));
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+ // Wait for a period of 'connUpdateCacheFrequency' and verify that there were no new calls
+ // to getTable() as the table level UCF should come in to effect
+ Thread.sleep(connUpdateCacheFrequency);
+ numExpectedGetTableCalls = 0;
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+ // Extend the wait to a period of 'tableUpdateCacheFrequency' and verify that there was one
+ // new call to getTable() for fetching the table's metadata
+ Thread.sleep(tableUpdateCacheFrequency - connUpdateCacheFrequency);
+ numExpectedGetTableCalls = 1;
+ verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+ }
+
+ /**
+ * Helper method that sets up the connections and creates the table to be tested.
+ * @param fullTableName The table's full name
+ * @param tableUpdateCacheFrequency If not null, the table-level value to be set for 'Update
+ * Cache Frequency'
+ * @param connUpdateCacheFrequency If not null, the connection-level value to be set for 'Update
+ * Cache Frequency'
+ */
+ private static void setUpTableAndConnections(String fullTableName,
+ String tableUpdateCacheFrequency, String connUpdateCacheFrequency) throws SQLException {
+ // Create two connections - a connection that we'll use to create the table and the second
+ // one that we will spy on and use to query the table.
+ Properties props = new Properties();
+ conn1 = DriverManager.getConnection(getUrl(), props);
+ conn1.setAutoCommit(true);
+
+ if (connUpdateCacheFrequency != null) {
+ props.put(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB,
+ connUpdateCacheFrequency);
+ }
+
+ // use a spied ConnectionQueryServices so we can verify calls to getTable()
+ spyForConn2 = Mockito.spy(driver.getConnectionQueryServices(getUrl(), props));
+ conn2 = spyForConn2.connect(getUrl(), props);
+ conn2.setAutoCommit(true);
+
+ String createTableQuery =
+ "CREATE TABLE " + fullTableName + " (k UNSIGNED_DOUBLE NOT NULL PRIMARY KEY, "
+ + "v1 UNSIGNED_DOUBLE, v2 UNSIGNED_DOUBLE, v3 UNSIGNED_DOUBLE)";
+
+ if (tableUpdateCacheFrequency != null) {
+ createTableQuery += " UPDATE_CACHE_FREQUENCY = " + tableUpdateCacheFrequency;
+ }
+
+ // Create the table over first connection
+ try (Statement stmt = conn1.createStatement()) {
+ stmt.execute(createTableQuery);
+ stmt.execute("UPSERT INTO " + fullTableName + " VALUES (1, 2, 3, 4)");
+ }
+ conn1.commit();
+ }
+
+ /**
+ * Helper method that executes a select and upsert query on the table for \p numSelectExecutions
+ * times and verifies that \p numExpectedGetTableCalls were made to getTable() for the table.
+ *
+ * Also resets the spy object for conn2 before returning.
+ *
+ * @param fullTableName The table's full name
+ * @param numExecutions Number of times a select+upsert should be executed on the table
+ * @param numExpectedGetTableCalls Number of expected calls to getTable()
+ */
+ private static void verifyExpectedGetTableCalls(String fullTableName, int numExecutions,
+ int numExpectedGetTableCalls) throws SQLException {
+ String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
+ String selectFromTableQuery = "SELECT k, v1, v2, v3 FROM " + fullTableName;
+
+ for (int i = 0; i < numExecutions; i++) {
+ // Query the table over the spied connection that has update cache frequency set
+ try (Statement stmt = conn2.createStatement();
+ ResultSet rs = stmt.executeQuery(selectFromTableQuery)) {
+ assertTrue(rs.next());
+ stmt.execute("UPSERT INTO " + fullTableName + " VALUES (1, 2, 3, 4)");
+ }
+ }
+
+ // Verify number of calls to getTable() for our table
+ verify(spyForConn2, times(numExpectedGetTableCalls)).getTable((PName) isNull(),
+ eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)),
+ anyLong(), anyLong());
+ reset(spyForConn2);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index aed296d..62a27d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -327,7 +327,6 @@
private final boolean returnSequenceValues ;
private Connection connection;
- private ZKClientService txZKClientService;
private volatile boolean initialized;
private volatile int nSequenceSaltBuckets;
@@ -759,9 +758,13 @@
* @throws IllegalArgumentException when a property is not set to a valid value.
*/
private void validateConnectionProperties(Properties info) {
- if (info.get(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null) {
+ if (info.get(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Connection's " + DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB + " set to " +
+ info.get(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+ }
ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
- info.getProperty(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+ info.getProperty(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
}
}
@@ -772,6 +775,7 @@
throwConnectionClosedIfNullMetaData();
validateConnectionProperties(info);
metadata = metadata.clone();
+
return new PhoenixConnection(this, url, info, metadata);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c6ebb62..9eafd55 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -727,14 +727,46 @@
// 1. table is a system table that does not have a ROW_TIMESTAMP column OR
// 2. table was already resolved as of that timestamp OR
// 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
+ // 3a. Get the effective UPDATE_CACHE_FREQUENCY for checking the age in the following precedence order:
+ // Table-level property > Connection-level property > Default value.
private boolean avoidRpcToGetTable(boolean alwaysHitServer, Long resolvedTimestamp,
boolean systemTable, PTable table, PTableRef tableRef, long tableResolvedTimestamp) {
- return table != null && !alwaysHitServer &&
- (systemTable && table.getRowTimestampColPos() == -1 ||
- resolvedTimestamp == tableResolvedTimestamp ||
- (table.getRowTimestampColPos() == -1 &&
- connection.getMetaDataCache().getAge(tableRef) <
- table.getUpdateCacheFrequency()));
+ if (table != null && !alwaysHitServer) {
+ if (systemTable && table.getRowTimestampColPos() == -1 ||
+ resolvedTimestamp == tableResolvedTimestamp) {
+ return true;
+ }
+
+ final long effectiveUpdateCacheFreq;
+ final String ucfInfoForLogging; // Only used for logging purposes
+
+ // What if the table is created with UPDATE_CACHE_FREQUENCY explicitly set to ALWAYS?
+ // i.e. explicitly set to 0. We should ideally be checking for something like
+ // hasUpdateCacheFrequency().
+ if (table.getUpdateCacheFrequency() != 0L) {
+ effectiveUpdateCacheFreq = table.getUpdateCacheFrequency();
+ ucfInfoForLogging = "table-level";
+ } else {
+ effectiveUpdateCacheFreq =
+ (Long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
+ connection.getQueryServices().getProps().get(
+ QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+ ucfInfoForLogging = connection.getQueryServices().getProps().get(
+ QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null ?
+ "connection-level" : "default";
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Using " + ucfInfoForLogging + " Update Cache Frequency (value = " +
+ effectiveUpdateCacheFreq + "ms) for " + table.getName() +
+ (table.getTenantId() != null ? ", Tenant ID: " + table.getTenantId() : ""));
+ }
+
+ return (table.getRowTimestampColPos() == -1 &&
+ connection.getMetaDataCache().getAge(tableRef) <
+ effectiveUpdateCacheFreq);
+ }
+ return false;
}
public MetaDataMutationResult updateCache(String schemaName) throws SQLException {