PHOENIX-5634: Use 'phoenix.default.update.cache.frequency' from connection properties at query time

Signed-off-by: Xinyi Yan <yanxinyi@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
new file mode 100644
index 0000000..1bc109e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.SchemaUtil;
+import static org.apache.phoenix.util.TestUtil.DEFAULT_SCHEMA_NAME;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for connection level 'Update Cache Frequency' property.
+ *
+ * The tests verify that the 'Update Cache Frequency' is honored in the following precedence order
+ * for SELECTs and UPSERTs:
+ * Table-level property > Connection-level property > Default value
+ */
+public class UpdateCacheConnectionLevelPropIT extends ParallelStatsDisabledIT {
+
+    private static Connection conn1;
+    private static Connection conn2;
+    private static ConnectionQueryServices spyForConn2;
+
+    @AfterClass
+    public static synchronized void freeResources() {
+        try {
+            conn1.close();
+            conn2.close();
+        } catch (Exception e) {
+            /* ignored */
+        }
+    }
+
+    /**
+     * Test 'Update Cache Frequency' property when it is set at connection-level only, and not at
+     * table-level.
+     */
+    @Test
+    public void testWithConnLevelUCFNoTableLevelUCF() throws Exception {
+        final long connUpdateCacheFrequency = 1000;
+        String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+                generateUniqueName();
+
+        setUpTableAndConnections(fullTableName, null,
+                String.valueOf(connUpdateCacheFrequency));
+
+        // There should only be a single call to getTable() for fetching the table's metadata
+        int numExecutions = 2;
+        int numExpectedGetTableCalls = 1;
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+        // Wait for a period of 'connUpdateCacheFrequency' and verify that there was one new call to
+        // getTable() for fetching the table's metadata
+        Thread.sleep(connUpdateCacheFrequency);
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+    }
+
+    /**
+     * Test 'Update Cache Frequency' property when it is set at table-level only, and not at
+     * connection-level.
+     */
+    @Test
+    public void testWithTableLevelUCFNoConnLevelUCF() throws Exception {
+        final long tableUpdateCacheFrequency = 1000;
+        String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+                generateUniqueName();
+
+        // There should only be a single call to getTable() for fetching the table's metadata
+        int numExecutions = 2;
+        int numExpectedGetTableCalls = 1;
+        setUpTableAndConnections(fullTableName, String.valueOf(tableUpdateCacheFrequency), null);
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+        // Wait for a period of 'tableUpdateCacheFrequency' and verify that there was one new call
+        // to getTable() for fetching the table's metadata
+        Thread.sleep(tableUpdateCacheFrequency);
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+    }
+
+    /**
+     * Test 'Update Cache Frequency' property when it is not set at both table-level and
+     * connection-level.
+     */
+    @Test
+    public void testWithNoConnAndTableLevelUCF() throws Exception {
+        String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+                generateUniqueName();
+
+        // This is the default behavior (i.e. always fetch the latest metadata of the table) when
+        // both connection and table level properties are not set
+        int numExecutions = 2;
+        int numExpectedGetTableCalls = 4; // 2 for SELECTs, and 2 for UPSERTs
+        setUpTableAndConnections(fullTableName, null, null);
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+    }
+
+    /**
+     * Test 'Update Cache Frequency' property when it is set at both table-level and
+     * connection-level.
+     */
+    @Test
+    public void testWithBothConnAndTableLevelUCF() throws Exception {
+        // Set table level property to a much higher value than the connection level property
+        final long tableUpdateCacheFrequency = 5000;
+        final long connUpdateCacheFrequency = 1000;
+        String fullTableName = DEFAULT_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR +
+                generateUniqueName();
+
+        // There should only be a single call to getTable() for fetching the table's metadata
+        int numExecutions = 2;
+        int numExpectedGetTableCalls = 1;
+        setUpTableAndConnections(fullTableName, String.valueOf(tableUpdateCacheFrequency),
+                String.valueOf(connUpdateCacheFrequency));
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+        // Wait for a period of 'connUpdateCacheFrequency' and verify that there were no new calls
+        // to getTable() as the table level UCF should come in to effect
+        Thread.sleep(connUpdateCacheFrequency);
+        numExpectedGetTableCalls = 0;
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+
+        // Extend the wait to a period of 'tableUpdateCacheFrequency' and verify that there was one
+        // new call to getTable() for fetching the table's metadata
+        Thread.sleep(tableUpdateCacheFrequency - connUpdateCacheFrequency);
+        numExpectedGetTableCalls = 1;
+        verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
+    }
+
+    /**
+     * Helper method that sets up the connections and creates the table to be tested.
+     * @param fullTableName The table's full name
+     * @param tableUpdateCacheFrequency If not null, the table-level value to be set for 'Update
+     *                                  Cache Frequency'
+     * @param connUpdateCacheFrequency If not null, the connection-level value to be set for 'Update
+     *                                 Cache Frequency'
+     */
+    private static void setUpTableAndConnections(String fullTableName,
+            String tableUpdateCacheFrequency, String connUpdateCacheFrequency) throws SQLException {
+        // Create two connections - a connection that we'll use to create the table and the second
+        // one that we will spy on and use to query the table.
+        Properties props = new Properties();
+        conn1 = DriverManager.getConnection(getUrl(), props);
+        conn1.setAutoCommit(true);
+
+        if (connUpdateCacheFrequency != null) {
+            props.put(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB,
+                    connUpdateCacheFrequency);
+        }
+
+        // use a spied ConnectionQueryServices so we can verify calls to getTable()
+        spyForConn2 = Mockito.spy(driver.getConnectionQueryServices(getUrl(), props));
+        conn2 = spyForConn2.connect(getUrl(), props);
+        conn2.setAutoCommit(true);
+
+        String createTableQuery =
+                "CREATE TABLE " + fullTableName + " (k UNSIGNED_DOUBLE NOT NULL PRIMARY KEY, "
+                        + "v1 UNSIGNED_DOUBLE, v2 UNSIGNED_DOUBLE, v3 UNSIGNED_DOUBLE)";
+
+        if (tableUpdateCacheFrequency != null) {
+            createTableQuery += " UPDATE_CACHE_FREQUENCY = " + tableUpdateCacheFrequency;
+        }
+
+        // Create the table over first connection
+        try (Statement stmt = conn1.createStatement()) {
+            stmt.execute(createTableQuery);
+            stmt.execute("UPSERT INTO " + fullTableName + " VALUES (1, 2, 3, 4)");
+        }
+        conn1.commit();
+    }
+
+    /**
+     * Helper method that executes a select and upsert query on the table for \p numSelectExecutions
+     * times and verifies that \p numExpectedGetTableCalls were made to getTable() for the table.
+     *
+     * Also resets the spy object for conn2 before returning.
+     *
+     * @param fullTableName The table's full name
+     * @param numExecutions Number of times a select+upsert should be executed on the table
+     * @param numExpectedGetTableCalls Number of expected calls to getTable()
+     */
+    private static void verifyExpectedGetTableCalls(String fullTableName, int numExecutions,
+            int numExpectedGetTableCalls) throws SQLException {
+        String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
+        String selectFromTableQuery = "SELECT k, v1, v2, v3 FROM " + fullTableName;
+
+        for (int i = 0; i < numExecutions; i++) {
+            // Query the table over the spied connection that has update cache frequency set
+            try (Statement stmt = conn2.createStatement();
+                    ResultSet rs = stmt.executeQuery(selectFromTableQuery)) {
+                assertTrue(rs.next());
+                stmt.execute("UPSERT INTO " + fullTableName + " VALUES (1, 2, 3, 4)");
+            }
+        }
+
+        // Verify number of calls to getTable() for our table
+        verify(spyForConn2, times(numExpectedGetTableCalls)).getTable((PName) isNull(),
+                eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                anyLong(), anyLong());
+        reset(spyForConn2);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index aed296d..62a27d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -327,7 +327,6 @@
     private final boolean returnSequenceValues ;
 
     private Connection connection;
-    private ZKClientService txZKClientService;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
 
@@ -759,9 +758,13 @@
      * @throws IllegalArgumentException when a property is not set to a valid value.
      */
     private void validateConnectionProperties(Properties info) {
-        if (info.get(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null) {
+        if (info.get(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("Connection's " + DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB + " set to " +
+                        info.get(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+            }
             ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
-                    info.getProperty(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+                    info.getProperty(DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
         }
     }
 
@@ -772,6 +775,7 @@
         throwConnectionClosedIfNullMetaData();
         validateConnectionProperties(info);
         metadata = metadata.clone();
+
         return new PhoenixConnection(this, url, info, metadata);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c6ebb62..9eafd55 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -727,14 +727,46 @@
     // 1. table is a system table that does not have a ROW_TIMESTAMP column OR
     // 2. table was already resolved as of that timestamp OR
     // 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
+    // 3a. Get the effective UPDATE_CACHE_FREQUENCY for checking the age in the following precedence order:
+    // Table-level property > Connection-level property > Default value.
     private boolean avoidRpcToGetTable(boolean alwaysHitServer, Long resolvedTimestamp,
             boolean systemTable, PTable table, PTableRef tableRef, long tableResolvedTimestamp) {
-        return table != null && !alwaysHitServer &&
-                (systemTable && table.getRowTimestampColPos() == -1 ||
-                        resolvedTimestamp == tableResolvedTimestamp ||
-                        (table.getRowTimestampColPos() == -1 &&
-                                connection.getMetaDataCache().getAge(tableRef) <
-                                        table.getUpdateCacheFrequency()));
+        if (table != null && !alwaysHitServer) {
+            if (systemTable && table.getRowTimestampColPos() == -1 ||
+                    resolvedTimestamp == tableResolvedTimestamp) {
+                return true;
+            }
+
+            final long effectiveUpdateCacheFreq;
+            final String ucfInfoForLogging; // Only used for logging purposes
+
+            // What if the table is created with UPDATE_CACHE_FREQUENCY explicitly set to ALWAYS?
+            // i.e. explicitly set to 0. We should ideally be checking for something like
+            // hasUpdateCacheFrequency().
+            if (table.getUpdateCacheFrequency() != 0L) {
+                effectiveUpdateCacheFreq = table.getUpdateCacheFrequency();
+                ucfInfoForLogging = "table-level";
+            } else {
+                effectiveUpdateCacheFreq =
+                        (Long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
+                                connection.getQueryServices().getProps().get(
+                                        QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
+                ucfInfoForLogging = connection.getQueryServices().getProps().get(
+                        QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB) != null ?
+                        "connection-level" : "default";
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Using " + ucfInfoForLogging + " Update Cache Frequency (value = " +
+                        effectiveUpdateCacheFreq + "ms) for " + table.getName() +
+                        (table.getTenantId() != null ? ", Tenant ID: " + table.getTenantId() : ""));
+            }
+
+            return (table.getRowTimestampColPos() == -1 &&
+                    connection.getMetaDataCache().getAge(tableRef) <
+                            effectiveUpdateCacheFreq);
+        }
+        return false;
     }
 
     public MetaDataMutationResult updateCache(String schemaName) throws SQLException {