PHOENIX-7509 : Metadata Cache should handle tables which have LAST_DDL_TIMESTAMP column null in syscat (#2059)

Co-authored-by: Palash Chauhan <p.chauhan@pchauha-ltm8owy.internal.salesforce.com>
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bb6a648..dba599f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4444,6 +4444,7 @@
 
             //move TTL values stored in descriptor to SYSCAT TTL column.
             moveTTLFromHBaseLevelTTLToPhoenixLevelTTL(metaConnection);
+            UpgradeUtil.bootstrapLastDDLTimestampForTablesAndViews(metaConnection);
             UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
         }
         return metaConnection;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
index 95b1d15..faab529 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
@@ -124,6 +124,9 @@
                     = getValidateDDLTimestampRequest(tableRefs);
             service.validateLastDDLTimestamp(null, request);
         } catch (Exception e) {
+            if (e instanceof StaleMetadataCacheException) {
+                throw (StaleMetadataCacheException) e;
+            }
             SQLException parsedException = ClientUtil.parseServerException(e);
             if (parsedException instanceof StaleMetadataCacheException) {
                 throw parsedException;
@@ -152,7 +155,7 @@
      * @return ValidateLastDDLTimestampRequest for the table in tableRef
      */
     private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
-        getValidateDDLTimestampRequest(List<TableRef> tableRefs) {
+        getValidateDDLTimestampRequest(List<TableRef> tableRefs) throws StaleMetadataCacheException {
 
         RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
                 = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
@@ -168,12 +171,20 @@
                     : tableRef.getTable().getAncestorLastDDLTimestampMap().entrySet()) {
                 innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
                 PTableKey ancestorKey = entry.getKey();
+                if (entry.getValue() == null) {
+                    throw new StaleMetadataCacheException(
+                            "LAST_DDL_TIMESTAMP set to null in client cache for {}" + ancestorKey);
+                }
                 setLastDDLTimestampRequestParameters(innerBuilder, ancestorKey, entry.getValue());
                 requestBuilder.addLastDDLTimestampRequests(innerBuilder);
             }
 
             // add the current table to the request
             PTable ptable = tableRef.getTable();
+            if (ptable.getLastDDLTimestamp() == null) {
+                throw new StaleMetadataCacheException(
+                        "LAST_DDL_TIMESTAMP set to null in client cache for {}" + ptable.getKey());
+            }
             innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
             setLastDDLTimestampRequestParameters(innerBuilder, ptable.getKey(),
                     ptable.getLastDDLTimestamp());
@@ -181,6 +192,10 @@
 
             // add all indexes of the current table
             for (PTable idxPTable : tableRef.getTable().getIndexes()) {
+                if (idxPTable.getLastDDLTimestamp() == null) {
+                    throw new StaleMetadataCacheException(
+                            "LAST_DDL_TIMESTAMP set to null in client cache for {}" + idxPTable.getKey());
+                }
                 innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
                 setLastDDLTimestampRequestParameters(innerBuilder, idxPTable.getKey(),
                         idxPTable.getLastDDLTimestamp());
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 435c888..2cb0050 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -1420,7 +1420,7 @@
            null : PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
                 lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
         builder.setLastDDLTimestamp(lastDDLTimestampKv != null ? lastDDLTimestamp :
-            oldTable != null ? oldTable.getLastDDLTimestamp() : null);
+            oldTable != null ? oldTable.getLastDDLTimestamp() : timeStamp);
 
         Cell changeDetectionEnabledKv = tableKeyValues[CHANGE_DETECTION_ENABLED_INDEX];
         boolean isChangeDetectionEnabled = changeDetectionEnabledKv != null
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index 8e309cb..fa05120 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -39,6 +39,7 @@
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -66,6 +67,14 @@
 import java.util.Random;
 
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
 import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -1854,6 +1863,33 @@
         }
     }
 
+    @Test
+    public void testLastDDLTimestampNotSetOnTable() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices cqs1 = driver.getConnectionQueryServices(url1, props);
+        try (Connection conn1 = cqs1.connect(url1, props)) {
+            createTable(conn1, tableName);
+            // null out LAST_DDL_TIMESTAMP and clear client/server caches
+            String pkCols = TENANT_ID + ", " + TABLE_SCHEM +
+                    ", " + TABLE_NAME + ", " + COLUMN_NAME + ", " + COLUMN_FAMILY;
+            String upsertSql =
+                    "UPSERT INTO " + SYSTEM_CATALOG_NAME + " (" + pkCols + ", " +
+                            LAST_DDL_TIMESTAMP + ")" + " " +
+                            "SELECT " + pkCols + ", NULL FROM " + SYSTEM_CATALOG_NAME + " " +
+                            "WHERE " + TABLE_NAME + " " + " = '" + tableName + "'";
+            conn1.createStatement().executeUpdate(upsertSql);
+            conn1.commit();
+            conn1.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+            // refresh client's cache and query, it should not face NPE
+            PhoenixRuntime.getTableNoCache(conn1, tableName);
+            query(conn1, tableName);
+            Assert.assertNotNull(PhoenixRuntime.getTable(conn1, tableName).getLastDDLTimestamp());
+        }
+    }
+
     //Helper methods
     public static void assertNumGetTableRPC(ConnectionQueryServices spyCqs, String tableName, int numExpectedRPCs) throws SQLException {
         Mockito.verify(spyCqs, Mockito.times(numExpectedRPCs)).getTable(eq(null),