PHOENIX-7166 : Enable feature flags introduced by metadata caching redesign (#1859)

diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 35d193d..534ce6e 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -224,7 +224,9 @@
     }
     public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
-        final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
+        final ColumnResolver resolver
+                = FromCompiler.getResolverForCreateIndex(
+                        create, connection, create.getUdfParseNodes());
         Scan scan = new Scan();
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         verifyIndexWhere(create.getWhere(), context, create.getTable().getName());
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 15b2343..9d3a6b8 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -295,9 +295,14 @@
         }
     }
 
-    public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
+    public static ColumnResolver getResolverForCreateIndex(SingleTableStatement statement,
+                           PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
             throws SQLException {
-        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true, 0, udfParseNodes);
+        // use alwaysHitServer=true to ensure client's cache is up-to-date even when client is
+        // validating last_ddl_timestamps and UCF = never.
+        SingleTableColumnResolver visitor
+                = new SingleTableColumnResolver(connection, statement.getTable(), true, 0,
+                                                    udfParseNodes, true, null);
         return visitor;
     }
 
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
index a7abbd5..58b7684 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -285,7 +285,7 @@
     }
 
     public long getCurrentTime() throws SQLException {
-        long ts = this.getCurrentTable().getCurrentTime();
+        long ts = this.getCurrentTable().getTimeStamp();
         // if the table is transactional then it is only resolved once per query, so we can't use the table timestamp
         if (this.getCurrentTable().getTable().getType() != PTableType.SUBQUERY
                 && this.getCurrentTable().getTable().getType() != PTableType.PROJECTED
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 890b99a..c996516 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,7 @@
                 // as max TS, so that the query can safely restarted and still work of a snapshot
                 // (so it won't see its own data in case of concurrent splits)
                 // see PHOENIX-4849
-                long serverTime = selectResolver.getTables().get(0).getCurrentTime();
+                long serverTime = selectResolver.getTables().get(0).getTimeStamp();
                 if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
                     // if this is the first time this table is resolved the ref's current time might not be defined, yet
                     // in that case force an RPC to get the server time
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 897de93..405fa80 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -358,6 +358,7 @@
                             String tableName = null;
                             clearResultSet();
                             PhoenixResultSet rs = null;
+                            QueryPlan plan = null;
                             try {
                                 PhoenixConnection conn = getConnection();
                                 conn.checkOpen();
@@ -367,9 +368,7 @@
                                         && stmt.getOperation() != Operation.UPGRADE) {
                                     throw new UpgradeRequiredException();
                                 }
-                                QueryPlan
-                                        plan =
-                                        stmt.compilePlan(PhoenixStatement.this,
+                                plan = stmt.compilePlan(PhoenixStatement.this,
                                                 Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 // Send mutations to hbase, so they are visible to subsequent reads.
                                 // Use original plan for data table so that data and immutable indexes will be sent
@@ -437,13 +436,26 @@
                             //Force update cache and retry if meta not found error occurs
                             catch (MetaDataEntityNotFoundException e) {
                                 if (doRetryOnMetaNotFoundError && e.getTableName() != null) {
+                                    String sName = e.getSchemaName();
+                                    String tName = e.getTableName();
+                                    // when the query plan uses the local index PTable,
+                                    // the TNFE can still be for the base table
+                                    if (plan != null && plan.getTableRef() != null) {
+                                        PTable queryPlanTable = plan.getTableRef().getTable();
+                                        if (queryPlanTable != null
+                                                && queryPlanTable.getIndexType()
+                                                    == IndexType.LOCAL) {
+                                            sName = queryPlanTable.getSchemaName().getString();
+                                            tName = queryPlanTable.getTableName().getString();
+                                        }
+                                    }
                                     if (LOGGER.isDebugEnabled()) {
                                         LOGGER.debug("Reloading table {} data from server",
-                                                e.getTableName());
+                                                tName);
                                     }
                                     if (new MetaDataClient(connection)
                                             .updateCache(connection.getTenantId(),
-                                                    e.getSchemaName(), e.getTableName(), true)
+                                                    sName, tName, true)
                                             .wasUpdated()) {
                                         updateMetrics = false;
                                         //TODO we can log retry count and error for debugging in LOG table
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 0ff3740..384cda9 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -75,6 +75,7 @@
 import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
@@ -6328,7 +6329,8 @@
     public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
             throws Throwable {
         boolean invalidateCacheEnabled =
-                config.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, false);
+                config.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
+                        DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED);
         if (!invalidateCacheEnabled) {
             LOGGER.info("Skip invalidating server metadata cache since conf property"
                     + " phoenix.metadata.invalidate.cache.enabled is set to false");
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1f55135..5be43c3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -119,6 +119,7 @@
 import java.util.Map.Entry;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.schema.ConnectionProperty;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -370,10 +371,23 @@
     //Security defaults
     public static final boolean DEFAULT_PHOENIX_ACLS_ENABLED = false;
 
-    //default update cache frequency
-    public static final long DEFAULT_UPDATE_CACHE_FREQUENCY = 0;
     public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100;
+
+    /**
+     * Metadata caching configs, see https://issues.apache.org/jira/browse/PHOENIX-6883.
+     * Disable the boolean flags and set UCF=always to disable the caching re-design.
+     *
+     * Disable caching re-design if you use Online Data Format Change since the cutover logic
+     * is currently incompatible and clients may not learn about the physical table change.
+     * See https://issues.apache.org/jira/browse/PHOENIX-7284.
+     *
+     * Disable caching re-design if your clients will not have ADMIN perms to call region server
+     * RPC. See https://issues.apache.org/jira/browse/HBASE-28508
+     */
+    public static final long DEFAULT_UPDATE_CACHE_FREQUENCY
+                = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("ALWAYS");
     public static final boolean DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED = false;
+    public static final boolean DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED = false;
     public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_FOR_PENDING_DISABLED_INDEX
                                                                             = Long.toString(0L);
 
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index d4e2fd7..0e58a3a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -200,6 +200,7 @@
 import org.apache.phoenix.schema.transform.TransformClient;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.TaskMetaDataServiceCallBack;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.apache.phoenix.util.ViewUtil;
 import org.apache.phoenix.util.JacksonUtil;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -771,6 +772,7 @@
             // What if the table is created with UPDATE_CACHE_FREQUENCY explicitly set to ALWAYS?
             // i.e. explicitly set to 0. We should ideally be checking for something like
             // hasUpdateCacheFrequency().
+
             //always fetch an Index in PENDING_DISABLE state to retrieve server timestamp
             //QueryOptimizer needs that to decide whether the index can be used
             if (PIndexState.PENDING_DISABLE.equals(table.getIndexState())) {
@@ -1516,14 +1518,17 @@
         Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
         String physicalSchemaName = null;
         String physicalTableName = null;
+        PTable dataTable = null;
         try {
-            ColumnResolver resolver = FromCompiler.getResolver(statement, connection, statement.getUdfParseNodes());
+            ColumnResolver resolver
+                    = FromCompiler.getResolverForCreateIndex(
+                            statement, connection, statement.getUdfParseNodes());
             tableRef = resolver.getTables().get(0);
             Date asyncCreatedDate = null;
             if (statement.isAsync()) {
-                asyncCreatedDate = new Date(tableRef.getTimeStamp());
+                asyncCreatedDate = new Date(tableRef.getCurrentTime());
             }
-            PTable dataTable = tableRef.getTable();
+            dataTable = tableRef.getTable();
             boolean isTenantConnection = connection.getTenantId() != null;
             if (isTenantConnection) {
                 if (dataTable.getType() != PTableType.VIEW) {
@@ -1776,7 +1781,14 @@
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
 
-        return buildIndex(table, tableRef);
+        MutationState state = buildIndex(table, tableRef);
+        // If client is validating LAST_DDL_TIMESTAMPS, parent's last_ddl_timestamp changed
+        // so remove it from client's cache. It will be refreshed when table is accessed next time.
+        if (ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(connection)) {
+            connection.removeTable(connection.getTenantId(), dataTable.getName().getString(),
+                    null, dataTable.getTimeStamp());
+        }
+        return state;
     }
 
     /**
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableRef.java
index 71a839e..64f13ac 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -91,7 +91,7 @@
         this.table = table;
         // if UPDATE_CACHE_FREQUENCY is set, always let the server set timestamps
         this.upperBoundTimeStamp = table.getUpdateCacheFrequency()!=0 ? QueryConstants.UNSET_TIMESTAMP : upperBoundTimeStamp;
-        this.currentTime = this.upperBoundTimeStamp;
+        this.currentTime = upperBoundTimeStamp;
         this.lowerBoundTimeStamp = lowerBoundTimeStamp;
         this.hasDynamicCols = hasDynamicCols;
         this.hinted = hinted;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
index bc1ae34..8e87fbc 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -77,6 +78,17 @@
     }
 
     /**
+     * Get whether last ddl timestamp validation is enabled in the Configuration
+     * @param config
+     * @return true if it is enabled, false otherwise
+     */
+    public static boolean getValidateLastDdlTimestampEnabled(Configuration config) {
+        return config.getBoolean(
+                QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
+                QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
+    }
+
+    /**
      * Verifies that table metadata for given tables is up-to-date in client cache with server.
      * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp.
      * Retry once if there was an error performing the RPC, otherwise throw the Exception.
@@ -196,7 +208,7 @@
         byte[] tenantIDBytes = key.getTenantId() == null
                 ? HConstants.EMPTY_BYTE_ARRAY
                 : key.getTenantId().getBytes();
-        byte[] schemaBytes = schemaName == null
+        byte[] schemaBytes = (schemaName == null || schemaName.isEmpty())
                 ?   HConstants.EMPTY_BYTE_ARRAY
                 : key.getSchemaName().getBytes();
         builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 53f9553..b216c9d 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -121,6 +121,12 @@
         ).execute();
     }
 
+    /**
+     * Disable caching re-design if you use Online Data Format Change since the cutover logic
+     * is currently incompatible and clients may not learn about the physical table change.
+     * See https://issues.apache.org/jira/browse/PHOENIX-6883 and
+     * https://issues.apache.org/jira/browse/PHOENIX-7284.
+     */
     public static void doCutover(PhoenixConnection connection, SystemTransformRecord systemTransformRecord) throws Exception{
         String tenantId = systemTransformRecord.getTenantId();
         String schema = systemTransformRecord.getSchemaName();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
similarity index 96%
rename from phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index 80e86eb..2135cc2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -852,6 +852,30 @@
     }
 
     /**
+     * Test that a client does not see TableNotFoundException when trying to validate
+     * LAST_DDL_TIMESTAMP for a view and its parent after the table was altered and removed from
+     * the client's cache.
+     */
+    @Test
+    public void testQueryViewAfterParentRemovedFromCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url = QueryUtil.getConnectionUrl(props, config);
+        ConnectionQueryServices cqs = driver.getConnectionQueryServices(url, props);
+        String tableName = generateUniqueName();
+        String viewName = generateUniqueName();
+        try (Connection conn = cqs.connect(url, props)) {
+            createTable(conn, tableName);
+            createView(conn, tableName, viewName);
+            query(conn, viewName);
+            // this removes the parent table from the client cache
+            alterTableDropColumn(conn, tableName, "v2");
+            query(conn, viewName);
+        } catch (TableNotFoundException e) {
+            fail("TableNotFoundException should not be encountered by client.");
+        }
+    }
+
+    /**
      * Test query on index with stale last ddl timestamp.
      * Client-1 creates a table and an index on it. Client-2 queries table to populate its cache.
      * Client-1 alters a property on the index. Client-2 queries the table again.
@@ -1324,6 +1348,30 @@
     }
 
     /**
+     * Test that a client can not create an index on a column after another client dropped the column.
+     */
+    @Test
+    public void testClientCannotCreateIndexOnDroppedColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+            createTable(conn1, tableName);
+            alterTableDropColumn(conn2, tableName, "v2");
+            createIndex(conn1, tableName, indexName, "v2");
+            fail("Client should not be able to create index on dropped column.");
+        }
+        catch (ColumnNotFoundException expected) {
+        }
+    }
+
+    /**
      * Test that upserts into a view whose parent was dropped throws a TableNotFoundException.
      */
     @Test
@@ -1606,14 +1654,14 @@
             map = viewPTable2.getAncestorLastDDLTimestampMap();
             assertEquals(basePTable2.getLastDDLTimestamp(), map.get(basePTable2.getKey()));
             assertEquals(2, viewPTable2.getIndexes().size());
-            for (PTable indexT : viewPTable2.getIndexes()) {
+            for (PTable indexOfView : viewPTable2.getIndexes()) {
                 // inherited index
-                if (indexT.getTableName().getString().equals(index2)) {
-                    map = indexT.getAncestorLastDDLTimestampMap();
+                if (indexOfView.getTableName().getString().equals(index2)) {
+                    map = indexOfView.getAncestorLastDDLTimestampMap();
                     assertEquals(basePTable2.getLastDDLTimestamp(), map.get(basePTable2.getKey()));
                 } else {
                     // view index
-                    map = indexT.getAncestorLastDDLTimestampMap();
+                    map = indexOfView.getAncestorLastDDLTimestampMap();
                     assertEquals(basePTable2.getLastDDLTimestamp(), map.get(basePTable2.getKey()));
                     assertEquals(viewPTable2.getLastDDLTimestamp(), map.get(viewPTable2.getKey()));
                 }
@@ -1724,7 +1772,10 @@
 
             // create index, getTable RPCs for base table
             createIndex(conn1, tableName, indexName, "v2");
-            numTableRPCs += 3; // one rpc each for getting current time, create index, alter index state after building
+            // getting current time,
+            // create index(compile+execute),
+            // alter index state after building(compile+execute)
+            numTableRPCs += 5;
             assertNumGetTableRPC(spyCqs1, tableName, numTableRPCs);
 
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index 12106b2..f02f358 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -21,6 +21,7 @@
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -1108,7 +1109,8 @@
             String viewIndex2 = generateUniqueName();
             String fullNameViewIndex1 = SchemaUtil.getTableName(viewSchemaName, viewIndex1);
             String fullNameViewIndex2 = SchemaUtil.getTableName(viewSchemaName, viewIndex2);
-            
+            List<String> fullViewIndexNames = Arrays.asList(fullNameViewIndex1, fullNameViewIndex2);
+
             conn.setAutoCommit(false);
             viewConn.setAutoCommit(false);
             String ddlFormat =
@@ -1149,10 +1151,8 @@
             byte[] viewIndexPhysicalTable = viewIndex.getPhysicalName().getBytes();
             assertNotNull("Can't find view index", viewIndex);
             assertEquals("Unexpected number of indexes ", 2, view.getIndexes().size());
-            assertEquals("Unexpected index ",  fullNameViewIndex1 , view.getIndexes().get(0).getName()
-                    .getString());
-            assertEquals("Unexpected index ",  fullNameViewIndex2 , view.getIndexes().get(1).getName()
-                .getString());
+            assertTrue("Expected index not found ",  fullViewIndexNames.contains(view.getIndexes().get(0).getName().getString()));
+            assertTrue("Expected index not found ",  fullViewIndexNames.contains(view.getIndexes().get(1).getName().getString()));
             assertEquals("Unexpected salt buckets", view.getBucketNum(),
                 view.getIndexes().get(0).getBucketNum());
             assertEquals("Unexpected salt buckets", view.getBucketNum(),
@@ -1181,14 +1181,15 @@
             }
             
             pconn = viewConn.unwrap(PhoenixConnection.class);
-            view = pconn.getTable(new PTableKey(tenantId,  viewOfTable ));
+            view = pconn.getTableNoCache(viewOfTable);
+            assertEquals("Unexpected number of indexes ", 1, view.getIndexes().size());
+            assertEquals("Unexpected index ",  fullNameViewIndex2 , view.getIndexes().get(0).getName().getString());
+            assertNotEquals("Dropped index should not be in view metadata ",  fullNameViewIndex1 , view.getIndexes().get(0).getName().getString());
             try {
-                viewIndex = pconn.getTable(new PTableKey(tenantId,  fullNameViewIndex1 ));
+                viewIndex = pconn.getTableNoCache(fullNameViewIndex1);
                 fail("View index should have been dropped");
             } catch (TableNotFoundException e) {
             }
-            assertEquals("Unexpected number of indexes ", 1, view.getIndexes().size());
-            assertEquals("Unexpected index ",  fullNameViewIndex2 , view.getIndexes().get(0).getName().getString());
             
             // verify that the physical index view table is *not* dropped
             conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(viewIndexPhysicalTable);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
index 3d25c1a..dba8c65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
@@ -305,7 +305,7 @@
                     "create table IF NOT EXISTS " + tableName + " ( id char(1) NOT NULL,"
                             + " col1 integer NOT NULL,"
                             + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1))"
-                            + " APPEND_ONLY_SCHEMA = true");
+                            + " APPEND_ONLY_SCHEMA = true, UPDATE_CACHE_FREQUENCY=always");
                 fail("UPDATE_CACHE_FREQUENCY attribute must not be set to ALWAYS if APPEND_ONLY_SCHEMA is true");
             } catch (SQLException e) {
                 assertEquals(SQLExceptionCode.UPDATE_CACHE_FREQUENCY_INVALID.getErrorCode(),
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index 9b258ce..6348497 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -59,7 +59,6 @@
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.coprocessor.SystemCatalogRegionObserver;
 import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
 import org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.MavenCoordinates;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index d7aea5a..397c0b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -16,7 +16,6 @@
  */
 package org.apache.phoenix.end2end;
 
-import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -146,7 +145,6 @@
     private static final int NUM_RECORDS = 5;
 
     boolean isNamespaceMapped;
-    boolean isReadAccessEnabledForListDecomRs;
 
     private String schemaName;
     private String tableName;
@@ -161,7 +159,6 @@
     BasePermissionsIT(final boolean isNamespaceMapped) throws Exception {
         this.isNamespaceMapped = isNamespaceMapped;
         this.tableName = generateUniqueName();
-        isReadAccessEnabledForListDecomRs = isReadAccessEnabledForListDecomRs();
     }
 
     static void initCluster(boolean isNamespaceMapped) throws Exception {
@@ -170,7 +167,6 @@
 
     static void initCluster(boolean isNamespaceMapped, boolean useCustomAccessController) throws Exception {
         if (null != testUtil) {
-            ServerMetadataCacheTestImpl.resetCache();
             testUtil.shutdownMiniCluster();
             testUtil = null;
         }
@@ -182,47 +178,11 @@
         configureNamespacesOnServer(config, isNamespaceMapped);
         configureStatsConfigurations(config);
         config.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true);
-        setPhoenixRegionServerEndpoint(config);
+
         testUtil.startMiniCluster(1);
         superUser1 = User.createUserForTesting(config, SUPER_USER, new String[0]);
         superUser2 = User.createUserForTesting(config, "superUser2", new String[0]);
 
-        /**
-         * CQSI initialization needs to make an Admin API call to fetch a list of live region servers.
-         * Permissions were relaxed for that API call in HBASE-28391.
-         * Disable metadata caching re-design on server if API call needs ADMIN access.
-         */
-        if (!isReadAccessEnabledForListDecomRs()) {
-            config.setLong(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB, 0L);
-            config.setBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, false);
-            config.setBoolean(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, false);
-        }
-    }
-
-    // See https://issues.apache.org/jira/browse/HBASE-28391
-    private static boolean isReadAccessEnabledForListDecomRs() {
-        // true for 2.4.18+, 2.5.8+
-        String hbaseVersion = VersionInfo.getVersion();
-        String[] versionArr = hbaseVersion.split("\\.");
-        int majorVersion = Integer.parseInt(versionArr[0]);
-        int minorVersion = Integer.parseInt(versionArr[1]);
-        int patchVersion = Integer.parseInt(versionArr[2].split("-hadoop")[0]);
-        if (majorVersion > 2) {
-            return true;
-        }
-        if (majorVersion < 2) {
-            return false;
-        }
-        if (minorVersion > 5) {
-            return true;
-        }
-        if (minorVersion < 4) {
-            return false;
-        }
-        if (minorVersion == 4) {
-            return patchVersion >= 18;
-        }
-        return patchVersion >= 8;
     }
 
     @Before
@@ -380,15 +340,6 @@
             props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
-        /**
-         * CQSI initialization needs to make an Admin API call to fetch a list of live region servers.
-         * Permissions were relaxed for that API call in HBASE-28391.
-         * Disable metadata caching re-design on client side if API call needs ADMIN access.
-         */
-        if (!isReadAccessEnabledForListDecomRs) {
-            props.put(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB, Long.toString(0L));
-            props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, Boolean.toString(false));
-        }
         return props;
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
index 243b3c7..0eba0b8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
@@ -23,6 +23,7 @@
 import static org.junit.Assume.assumeTrue;
 
 import java.sql.Connection;
+import java.sql.Driver;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -34,6 +35,7 @@
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.query.ConfigurationFactory;
 import org.apache.phoenix.util.InstanceResolver;
@@ -58,6 +60,7 @@
         conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-test");
         hbaseTestUtil.startMiniCluster();
         Class.forName(PhoenixDriver.class.getName());
+        DriverManager.registerDriver(new PhoenixTestDriver());
         InstanceResolver.clearSingletons();
         // Make sure the ConnectionInfo doesn't try to pull a default Configuration
         InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionUtilIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionUtilIT.java
index 1937d05..d8454a8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionUtilIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionUtilIT.java
@@ -22,6 +22,7 @@
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,6 +51,7 @@
         conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-test");
         hbaseTestUtil.startMiniCluster();
         Class.forName(PhoenixDriver.class.getName());
+		DriverManager.registerDriver(new PhoenixTestDriver());
     }
     
 	@Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index d7473e5..ceaa140 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -906,7 +906,8 @@
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         String createTableString = "CREATE TABLE " + tableName + " (k VARCHAR PRIMARY KEY, "
                 + "v1 VARCHAR, v2 VARCHAR)";
-        verifyUCFValueInSysCat(tableName, createTableString, props, 0L);
+        verifyUCFValueInSysCat(tableName, createTableString, props,
+                QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY);
     }
 
     @Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
index 7039a85..bae0589 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
@@ -31,6 +31,7 @@
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -530,13 +531,16 @@
     }
 
     public static void renameAndDropPhysicalTable(Connection conn, String tenantId, String schema, String tableName, String physicalName, boolean isNamespaceEnabled) throws Exception {
+        // if client is validating last_ddl_timestamp, this change in physical table name should be visible to the client
+        // UPDATE LAST_DDL_TIMESTAMP of the table and clear the server metadata cache on region servers
+        long lastDDLTimestamp = EnvironmentEdgeManager.currentTimeMillis();
         String
                 changeName = String.format(
-                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, %s, '%s', NULL, NULL, '%s')",
-                tenantId, schema==null ? null : ("'" + schema + "'"), tableName, physicalName);
+                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME, LAST_DDL_TIMESTAMP) VALUES (%s, %s, '%s', NULL, NULL, '%s', %d)",
+                tenantId, schema==null ? null : ("'" + schema + "'"), tableName, physicalName, lastDDLTimestamp);
         conn.createStatement().execute(changeName);
         conn.commit();
-
+        ServerMetadataCacheTestImpl.resetCache();
         String fullTableName = SchemaUtil.getTableName(schema, tableName);
         if (isNamespaceEnabled && !(Strings.isNullOrEmpty(schema) || NULL_STRING.equals(schema))) {
             fullTableName = schema + NAMESPACE_SEPARATOR + tableName;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
index 496d2bd..b99f66d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
@@ -209,21 +209,26 @@
             createIndexOnTable(conn, tableName, indexName2);
             populateTable(conn, tableName, 1, 2);
 
-            // Test hint
             String tableSelect = "SELECT V1,V2,V3 FROM " + tableName;
             ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + tableSelect);
-            assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName));
+            String plan = QueryUtil.getExplainPlan(rs);
+            // plan should use one of the indexes
+            assertEquals(true, plan.contains(indexName) || plan.contains(indexName2));
+            // Test hint for the other index
+            String hintedIndex = QueryUtil.getExplainPlan(rs).contains(indexName)
+                                    ? indexName2
+                                    : indexName;
             try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices()
                     .getAdmin()) {
-                String snapshotName = new StringBuilder(indexName2).append("-Snapshot").toString();
-                admin.snapshot(snapshotName, TableName.valueOf(indexName2));
-                String newName = "NEW_" + indexName2;
+                String snapshotName = new StringBuilder(hintedIndex).append("-Snapshot").toString();
+                admin.snapshot(snapshotName, TableName.valueOf(hintedIndex));
+                String newName = "NEW_" + hintedIndex;
                 admin.cloneSnapshot(snapshotName, TableName.valueOf(newName));
-                renameAndDropPhysicalTable(conn, "NULL", null, indexName2, newName, true);
+                renameAndDropPhysicalTable(conn, "NULL", null, hintedIndex, newName, true);
             }
-            String indexSelect = "SELECT /*+ INDEX(" + tableName + " " + indexName2 + ")*/ V1,V2,V3 FROM " + tableName;
+            String indexSelect = "SELECT /*+ INDEX(" + tableName + " " + hintedIndex + ")*/ V1,V2,V3 FROM " + tableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + indexSelect);
-            assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName2));
+            assertEquals(true, QueryUtil.getExplainPlan(rs).contains(hintedIndex));
             rs = conn.createStatement().executeQuery(indexSelect);
             assertEquals(true, rs.next());
         }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index 29fc6c5..918f7a1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -41,10 +42,12 @@
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
 import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.exception.UpgradeInProgressException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -54,6 +57,7 @@
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.After;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -85,9 +89,15 @@
     final UserGroupInformation user4 =
             UserGroupInformation.createUserForTesting("user4", new String[0]);
 
+    @BeforeClass
+    public static synchronized void registerTestDriver() throws SQLException {
+        DriverManager.registerDriver(new PhoenixTestDriver());
+    }
+
     public final void doSetup(boolean systemMappingEnabled) throws Exception {
         testUtil = new HBaseTestingUtility();
         Configuration conf = testUtil.getConfiguration();
+        conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, PhoenixRegionServerEndpointTestImpl.class.getName());
         enableNamespacesOnServer(conf, systemMappingEnabled);
         configureRandomHMasterPort(conf);
         testUtil.startMiniCluster(1);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
index b636f56..2cc4629 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -25,6 +25,7 @@
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -77,6 +78,7 @@
         url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
 
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        DriverManager.registerDriver(new PhoenixTestDriver());
     }
 
     @AfterClass
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index 8680325..1ff648b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME;
@@ -51,6 +52,8 @@
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
 import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -69,6 +72,7 @@
 import org.apache.phoenix.util.UpgradeUtil;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -174,6 +178,10 @@
         }
     }
 
+    @BeforeClass
+    public static synchronized void registerTestDriver() throws SQLException {
+        DriverManager.registerDriver(new PhoenixTestDriver());
+    }
     @Before
     public void resetVariables() {
         setOldTimestampToInduceUpgrade = false;
@@ -192,6 +200,7 @@
                     refCountLeaked = BaseTest.isAnyStoreRefCountLeaked(testUtil.getAdmin());
                 }
                 testUtil.shutdownMiniCluster();
+                ServerMetadataCacheTestImpl.resetCache();
                 testUtil = null;
                 assertFalse("refCount leaked", refCountLeaked);
             }
@@ -661,6 +670,7 @@
         conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, isNamespaceMappingEnabled);
         // Avoid multiple clusters trying to bind to the master's info port (16010)
         conf.setInt(HConstants.MASTER_INFO_PORT, -1);
+        conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, PhoenixRegionServerEndpointTestImpl.class.getName());
         testUtil.startMiniCluster(1);
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java
index 23606d8..76f5a70 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java
@@ -38,6 +38,7 @@
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -104,17 +105,19 @@
             } catch (TableNotFoundException e) {
                 //Expected
             }
-            rs = conn2.createStatement().executeQuery("select * from "+tableName);
             try {
+                rs = conn2.createStatement().executeQuery("select * from "+tableName);
                 rs.next();
                 fail("Should throw org.apache.hadoop.hbase.TableNotFoundException since the latest metadata " +
                         "wasn't fetched");
-            } catch (PhoenixIOException ex) {
-                boolean foundHBaseTableNotFound = false;
-                for(Throwable throwable : Throwables.getCausalChain(ex)) {
-                    if(org.apache.hadoop.hbase.TableNotFoundException.class.equals(throwable.getClass())) {
-                        foundHBaseTableNotFound = true;
-                        break;
+            } catch (SQLException ex) {
+                boolean foundHBaseTableNotFound = (ex instanceof TableNotFoundException);
+                if (!foundHBaseTableNotFound) {
+                    for(Throwable throwable : Throwables.getCausalChain(ex)) {
+                        if(org.apache.hadoop.hbase.TableNotFoundException.class.equals(throwable.getClass())) {
+                            foundHBaseTableNotFound = true;
+                            break;
+                        }
                     }
                 }
                 assertTrue("Should throw org.apache.hadoop.hbase.TableNotFoundException since the latest" +
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index c59726a..2c5478f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -68,7 +68,6 @@
 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -731,13 +730,14 @@
     }
 
     private void nullDDLTimestamps(Connection conn) throws SQLException {
+        //ignore system tables since that can interfere with other tests.
         String pkCols = TENANT_ID + ", " + TABLE_SCHEM +
             ", " + TABLE_NAME + ", " + COLUMN_NAME + ", " + COLUMN_FAMILY;
         String upsertSql =
             "UPSERT INTO " + SYSTEM_CATALOG_NAME + " (" + pkCols + ", " +
                 LAST_DDL_TIMESTAMP + ")" + " " +
                 "SELECT " + pkCols + ", NULL FROM " + SYSTEM_CATALOG_NAME + " " +
-                "WHERE " + TABLE_TYPE + " IS NOT NULL";
+                "WHERE " + TABLE_TYPE + " " + " != '" + PTableType.SYSTEM.getSerializedValue() + "'";
         conn.createStatement().execute(upsertSql);
         conn.commit();
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
index aaf58d2..4732ebd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
@@ -162,9 +162,7 @@
                 exception = (SQLException) e;
             }
             connection.commit();
-            PTableKey key = new PTableKey(null, this.tableName);
-            PMetaData metaCache = connection.unwrap(PhoenixConnection.class).getMetaDataCache();
-            List<PTable> indexes = metaCache.getTableRef(key).getTable().getIndexes();
+            List<PTable> indexes = connection.unwrap(PhoenixConnection.class).getTable(this.tableName).getIndexes();
             if (!overThreshold) {
                 if (this.mode == Mode.ASYNC) {
                     assertEquals(PIndexState.BUILDING, indexes.get(0).getIndexState());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 79a8209..9b7b277 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,6 +55,7 @@
 import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
 import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -176,6 +178,7 @@
          * because we want to control it's execution ourselves
          */
         serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
+        serverProps.put(REGIONSERVER_COPROCESSOR_CONF_KEY, PhoenixRegionServerEndpointTestImpl.class.getName());
         return serverProps;
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
index 94700e0..42876ea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
@@ -42,6 +42,7 @@
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -87,6 +88,7 @@
         // use round robin iterator
         driverProps.put(FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        DriverManager.registerDriver(new PhoenixTestDriver());
         try (PhoenixConnection phxConn = DriverManager.getConnection(url, driverProps).unwrap(PhoenixConnection.class)) {
             // run test methods only if we are at the hbase version that supports lease renewal.
             Assume.assumeTrue(phxConn.getQueryServices().supportsFeature(Feature.RENEW_LEASE));
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index bc67e42..526b83e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -18,7 +18,9 @@
 package org.apache.phoenix.jdbc;
 
 import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
 import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.RandomUtils;
@@ -53,6 +55,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.hadoop.hbase.HConstants.*;
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
 import static org.apache.hadoop.hbase.ipc.RpcClient.*;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.waitFor;
@@ -536,6 +539,10 @@
 
             // Hadoop cluster settings to avoid failing tests
             conf.setInt(DFS_REPLICATION_KEY, 1); // we only need one replica for testing
+
+            // Phoenix Region Server Endpoint needed for metadata caching
+            conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                        PhoenixRegionServerEndpointTestImpl.class.getName());
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
index fb0ad28..f5b1330 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
@@ -88,7 +88,7 @@
         String zkQuorum = "localhost:" + hBaseTestingUtility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-
+        DriverManager.registerDriver(new PhoenixTestDriver());
         String profileName = "setup";
         final String urlWithPrinc = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName
                 + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 7ecacd2..5772da8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -97,6 +97,7 @@
     public static void setUpBeforeClass() throws Exception {
         CLUSTERS.start();
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        DriverManager.registerDriver(new PhoenixTestDriver());
         GLOBAL_PROPERTIES.setProperty(AUTO_COMMIT_ATTRIB, "true");
         GLOBAL_PROPERTIES.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
         GLOBAL_PROPERTIES.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); //Need logging for query metrics
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index 20c06c3..4c5d495 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -38,6 +38,7 @@
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.query.QueryServicesTestImpl;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -46,6 +47,7 @@
 import org.apache.phoenix.util.DelayedOrFailingRegionServer;
 import org.apache.phoenix.util.ReadOnlyProps;
 
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,6 +56,8 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
 import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
@@ -133,6 +137,7 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -153,6 +158,7 @@
 
     @BeforeClass public static void doSetup() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
         conf.set(QueryServices.TABLE_LEVEL_METRICS_ENABLED, String.valueOf(true));
         conf.set(QueryServices.METRIC_PUBLISHER_ENABLED, String.valueOf(true));
         conf.set(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
@@ -171,7 +177,7 @@
                 return copy;
             }
         });
-        hbaseTestUtil = new HBaseTestingUtility();
+        hbaseTestUtil = new HBaseTestingUtility(conf);
         hbaseTestUtil.startMiniCluster(1, 1, null, null, DelayedOrFailingRegionServer.class);
         // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
         String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
@@ -1276,14 +1282,21 @@
                 assertTrue(metricExists);
                 metricExists = false;
                 //assert BaseTable is not being queried
-                for (PhoenixTableMetric metric : getPhoenixTableClientMetrics().get(dataTable)) {
-                    if (metric.getMetricType().equals(SELECT_SQL_COUNTER)) {
-                        metricExists = true;
-                        assertMetricValue(metric, SELECT_SQL_COUNTER, 0, CompareOp.EQ);
-                        break;
+                //if client is validating last_ddl_timestamps with ucf=never,
+                //there will be no metrics for base table (like getTable RPC times/counts).
+                if (ValidateLastDDLTimestampUtil
+                        .getValidateLastDdlTimestampEnabled(conn.unwrap(PhoenixConnection.class))) {
+                    assertFalse(getPhoenixTableClientMetrics().containsKey(dataTable));
+                } else {
+                    for (PhoenixTableMetric metric : getPhoenixTableClientMetrics().get(dataTable)) {
+                        if (metric.getMetricType().equals(SELECT_SQL_COUNTER)) {
+                            metricExists = true;
+                            assertMetricValue(metric, SELECT_SQL_COUNTER, 0, CompareOp.EQ);
+                            break;
+                        }
                     }
+                    assertTrue(metricExists);
                 }
-                assertTrue(metricExists);
             }
         }
     }
@@ -1586,11 +1599,16 @@
      * Custom driver to return a custom QueryServices object
      */
     public static class PhoenixMetricsTestingDriver extends PhoenixTestDriver {
-        private ConnectionQueryServices cqs;
+        @GuardedBy("this")
+        private final Map<ConnectionInfo, ConnectionQueryServices>
+                connectionQueryServicesMap = new HashMap<>();
+
+        private final QueryServices qsti;
         private ReadOnlyProps overrideProps;
 
         public PhoenixMetricsTestingDriver(ReadOnlyProps props) {
             overrideProps = props;
+            qsti = new QueryServicesTestImpl(getDefaultProps(), overrideProps);
         }
 
         @Override public boolean acceptsURL(String url) {
@@ -1598,17 +1616,16 @@
         }
 
         @Override public synchronized ConnectionQueryServices getConnectionQueryServices(String url,
-                Properties info) throws SQLException {
-            if (cqs == null) {
-                QueryServicesTestImpl qsti =
-                        new QueryServicesTestImpl(getDefaultProps(), overrideProps);
-                cqs =
-                        new PhoenixMetricsTestingQueryServices(
-                            qsti,
-                                ConnectionInfo.create(url, qsti.getProps(), info), info);
-                cqs.init(url, info);
+                                                                                         Properties info) throws SQLException {
+            ConnectionInfo connInfo = ConnectionInfo.create(url, null, info);
+            ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(connInfo);
+            if (connectionQueryServices != null) {
+                return connectionQueryServices;
             }
-            return cqs;
+            connectionQueryServices = new PhoenixMetricsTestingQueryServices(qsti, connInfo, info);
+            connectionQueryServices.init(url, info);
+            connectionQueryServicesMap.put(connInfo, connectionQueryServices);
+            return connectionQueryServices;
         }
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
index 92bf8ae..dacd07e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -18,11 +18,14 @@
 
 package org.apache.phoenix.query;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.util.DelayedRegionServer;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.AfterClass;
@@ -57,7 +60,9 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
+        final Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+        hbaseTestUtil = new HBaseTestingUtility(conf);
 
         hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class);
         // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
@@ -65,6 +70,7 @@
         url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum +
                 JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A";
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        DriverManager.registerDriver(new PhoenixTestDriver());
     }
 
     private String getUniqueUrl() {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCachingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCachingIT.java
index 390df78..06cc176 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCachingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCachingIT.java
@@ -25,6 +25,7 @@
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -51,6 +52,9 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataCachingIT.class);
     private final Random RAND = new Random(11);
 
+    private boolean isLastDDLTimestampValidationEnabled
+            = ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(config);
+
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
@@ -143,7 +147,7 @@
     @Test
     public void testGlobalClientCacheMetrics() throws Exception {
         int numThreads = 5;
-        int numTables = 1;
+        int numTables = 1; // test with only 1 table because we pick tables randomly in the workload
         int numMaxDML = 2;
 
         GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER.getMetric().reset();
@@ -152,12 +156,19 @@
         simulateWorkload("testGlobalClientCacheMetrics", numTables, numThreads, numMaxDML);
 
         // only 1 miss when the table is created
+        int numExpectedMisses = 1;
+        if (isLastDDLTimestampValidationEnabled) {
+            // if we are validating last_ddl_timestamps,
+            // region server will see 2 more misses when trying to update its cache
+            numExpectedMisses += 2;
+        }
+
         assertEquals("Incorrect number of client metadata cache misses",
-                1, GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER.getMetric().getValue());
+                numExpectedMisses, GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER.getMetric().getValue());
 
         // (2 hits per upsert + 1 hit per select) per thread
-        assertEquals("Incorrect number of client metadata cache hits",
-                3*numMaxDML*numThreads, GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER.getMetric().getValue());
+        assertTrue("number of total metadata cache hits (server+client) should be more than or equal to client cache hits",
+                3*numMaxDML*numThreads <= GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER.getMetric().getValue());
     }
 
     /*
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
index b1f063f..59459e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheConnectionLevelPropIT.java
@@ -22,11 +22,13 @@
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.SchemaUtil;
 import static org.apache.phoenix.util.TestUtil.DEFAULT_SCHEMA_NAME;
 
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -61,6 +63,8 @@
     private static Connection conn1;
     private static Connection conn2;
     private static ConnectionQueryServices spyForConn2;
+    private boolean isLastDDLTimestampValidationEnabled
+            = ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(config);
 
     @AfterClass
     public static synchronized void freeResources() {
@@ -135,6 +139,12 @@
         // both connection and table level properties are not set
         int numExecutions = 2;
         int numExpectedGetTableCalls = 4; // 2 for SELECTs, and 2 for UPSERTs
+
+        // there will be no getTable calls if we are validating last_ddl_timestamps
+        // and schema has not changed.
+        if (isLastDDLTimestampValidationEnabled) {
+            numExpectedGetTableCalls = 0;
+        }
         setUpTableAndConnections(fullTableName, null, null);
         verifyExpectedGetTableCalls(fullTableName, numExecutions, numExpectedGetTableCalls);
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
index d26536f..02e90c1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
@@ -48,6 +48,7 @@
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.types.PVarchar;
@@ -56,6 +57,7 @@
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
@@ -69,6 +71,8 @@
 @Category(ParallelStatsDisabledTest.class)
 public class UpdateCacheIT extends ParallelStatsDisabledIT {
 
+    private boolean isLastDDLTimestampValidationEnabled
+            = ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(config);
     private static final Logger LOGGER =
         LoggerFactory.getLogger(UpdateCacheIT.class);
 
@@ -87,7 +91,13 @@
             String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
             Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
             conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'");
-            helpTestUpdateCache(fullTableName, new int[] {1, 3}, false);
+            int[] expectedRPCs = new int[] {1, 3};
+            if (isLastDDLTimestampValidationEnabled) {
+                // when validating last_ddl_timestamps, no getTable RPCs will be performed
+                // since schema has not changed.
+                expectedRPCs = new int[] {0, 0};
+            }
+            helpTestUpdateCache(fullTableName, expectedRPCs, false);
         }
     }
 
@@ -97,7 +107,13 @@
         String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
         Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
         conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA);
-        helpTestUpdateCache(fullTableName, new int[] {1, 3}, false);
+        int[] expectedRPCs = new int[] {1, 3};
+        if (isLastDDLTimestampValidationEnabled) {
+            // when validating last_ddl_timestamps, no getTable RPCs will be performed
+            // since schema has not changed.
+            expectedRPCs = new int[] {0, 0};
+        }
+        helpTestUpdateCache(fullTableName, expectedRPCs, false);
     }
 	
     @Test
@@ -217,7 +233,14 @@
         }
 
         // The indexes should have got the UPDATE_CACHE_FREQUENCY value of their base table
-        helpTestUpdateCache(fullTableName, new int[] {0, 0}, false);
+        int numRPCUpsert = 0;
+        int numRPCSelect = 0;
+        if (isLastDDLTimestampValidationEnabled) {
+            // we created indexes on the table which will bump the last_ddl_timestamp of the table
+            // hence we will do 1 getTable RPC for the upsert
+            numRPCUpsert = 1;
+        }
+        helpTestUpdateCache(fullTableName, new int[] {numRPCUpsert, numRPCSelect}, false);
         helpTestUpdateCache(INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + localIndex,
                 new int[] {0}, true);
         helpTestUpdateCache(INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + globalIndex,
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index d2b8004..d80134e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -3544,8 +3544,8 @@
                 PTable viewIndexPTable = tenantConn.unwrap(PhoenixConnection.class).getTable(globalViewIndexName);
                 // PK of view index [_INDEX_ID, tenant_id, KV, PK]
                 byte[] startRow = ByteUtil.concat(PLong.INSTANCE.toBytes(viewIndexPTable.getViewIndexId()),
-                        PChar.INSTANCE.toBytes(tenantId),
-                        PChar.INSTANCE.toBytes("KV"));
+                                                    PChar.INSTANCE.toBytes(tenantId),
+                                                    PChar.INSTANCE.toBytes("KV"));
                 assertArrayEquals(startRow, scan.getStartRow());
             }
         }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
index c64252d..51d8972 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
@@ -32,6 +32,7 @@
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -51,9 +52,9 @@
     @Test
     public void testExecuteQueryChainFailure() throws SQLException {
         HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
-
-        PhoenixConnection conn1 = (PhoenixConnection) DriverManager.getConnection(url);
-        PhoenixConnection conn2 = (PhoenixConnection) DriverManager.getConnection(url);
+        Properties props = new Properties();
+        PhoenixConnection conn1 = (PhoenixConnection) DriverManager.getConnection(url, props);
+        PhoenixConnection conn2 = (PhoenixConnection) DriverManager.getConnection(url, props);
         PhoenixConnection connSpy1 = Mockito.spy(conn1);
         PhoenixConnection connSpy2 = Mockito.spy(conn2);
         AtomicInteger numStatementsCreatedOnConn1 = new AtomicInteger();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index f8350e4..6a9eb7b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -35,6 +35,7 @@
 
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -49,6 +50,7 @@
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,7 +88,8 @@
         // only load the test driver if we are testing locally - for integration tests, we want to
         // test on a wider scale
         if (PhoenixEmbeddedDriver.isTestUrl(url)) {
-            driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+            Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+            driver = initDriver(new ReadOnlyProps(props));
             assertTrue(DriverManager.getDriver(url) == driver);
             driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
         }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 18892db..1405ad6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -661,6 +661,9 @@
         if (value == null) {
             value = PhoenixRegionServerEndpointTestImpl.class.getName();
         }
+        else {
+            value = value + "," + PhoenixRegionServerEndpointTestImpl.class.getName();
+        }
         conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, value);
     }
     private static PhoenixTestDriver newTestDriver(ReadOnlyProps props) throws Exception {