HIVE-24183: Clean up local HS2 HMS cache code (Jesus Camacho Rodriguez, reviewed by Vineet Garg)

Closes apache/hive#1511
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestSessionHiveMetastoreClientListPartitionsTempTable.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestSessionHiveMetastoreClientListPartitionsTempTable.java
index 2c70aac..5a4ebb5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestSessionHiveMetastoreClientListPartitionsTempTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestSessionHiveMetastoreClientListPartitionsTempTable.java
@@ -283,19 +283,19 @@
         null, (short)-1, new ArrayList<>());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsByExprNoTbl() throws Exception {
     getClient().listPartitionsByExpr(DB_NAME, TABLE_NAME, new byte[] {'f', 'o', 'o'},
         null, (short)-1, new ArrayList<>());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsByExprEmptyDbName() throws Exception {
     getClient().listPartitionsByExpr("", TABLE_NAME, new byte[] {'f', 'o', 'o'},
         null, (short)-1, new ArrayList<>());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsByExprEmptyTblName() throws Exception {
     createTable3PartCols1Part(getClient());
     getClient().listPartitionsByExpr(DB_NAME, "", new byte[] {'f', 'o', 'o'},
@@ -396,7 +396,7 @@
     getClient().listPartitionsSpecByExpr(req, null);
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsSpecByExprNoTbl() throws Exception {
     PartitionsByExprRequest req = new PartitionsByExprRequest(DB_NAME, TABLE_NAME,
             ByteBuffer.wrap(new byte[] {'f', 'o', 'o'}));
@@ -405,7 +405,7 @@
     getClient().listPartitionsSpecByExpr(req, new ArrayList<>());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsSpecByExprEmptyDbName() throws Exception {
     PartitionsByExprRequest req = new PartitionsByExprRequest("", TABLE_NAME,
             ByteBuffer.wrap(new byte[] {'f', 'o', 'o'}));
@@ -414,7 +414,7 @@
     getClient().listPartitionsSpecByExpr(req, new ArrayList<>());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionsSpecByExprEmptyTblName() throws Exception {
     Table t = createTable3PartCols1Part(getClient());
 
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
index 71d467d..0590d86 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
@@ -24,7 +24,6 @@
 import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -51,7 +50,6 @@
 import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
 import org.apache.hadoop.hive.metastore.api.TableStatsResult;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
@@ -176,8 +174,8 @@
         List.class, String.class, String.class, String.class, TableWatermark.class),
     // GetValidWriteIdsResponse <-- getValidWriteIdsInternal(GetValidWriteIdsRequest rqst)
     // Stored individually as:
-    // TableValidWriteIds <-- String fullTableName, String validTxnList, long writeId, (long tableId ?)
-    VALID_WRITE_IDS(TableValidWriteIds.class, String.class, String.class, long.class, long.class);
+    // TableValidWriteIds <-- String fullTableName, String validTxnList, long writeId
+    VALID_WRITE_IDS(TableValidWriteIds.class, String.class, String.class, long.class);
 
     private final List<Class<?>> keyClass;
     private final Class<?> valueClass;
@@ -252,62 +250,59 @@
 
   @Override
   protected PartitionsByExprResult getPartitionsByExprInternal(PartitionsByExprRequest req) throws TException {
-    PartitionsByExprResult r;
+    if (isCacheEnabledAndInitialized()) {
+      // table should be transactional to get responses from the cache
+      TableWatermark watermark = new TableWatermark(
+          req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
+      if (watermark.isValid()) {
+        CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, watermark, req);
+        PartitionsByExprResult r = (PartitionsByExprResult) mscLocalCache.getIfPresent(cacheKey);
+        if (r == null) {
+          r = super.getPartitionsByExprInternal(req);
+          mscLocalCache.put(cacheKey, r);
+        }
 
-    // table should be transactional to get responses from the cache
-    TableWatermark watermark = new TableWatermark(
-        req.getValidWriteIdList(), req.getId());
-    if (isCacheEnabledAndInitialized() && watermark.isValid()) {
-      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
-      r = (PartitionsByExprResult) mscLocalCache.getIfPresent(cacheKey);
-      if (r == null) {
-        r = super.getPartitionsByExprInternal(req);
-        mscLocalCache.put(cacheKey, r);
-      }
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
 
-      if (LOG.isDebugEnabled() && RECORD_STATS) {
-        LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        return r;
       }
-    } else {
-      r = super.getPartitionsByExprInternal(req);
     }
 
-    return r;
+    return super.getPartitionsByExprInternal(req);
   }
 
   @Override
   protected PartitionsSpecByExprResult getPartitionsSpecByExprInternal(PartitionsByExprRequest req) throws TException {
-    PartitionsSpecByExprResult r;
+    if (isCacheEnabledAndInitialized()) {
+      // table should be transactional to get responses from the cache
+      TableWatermark watermark = new TableWatermark(
+          req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
+      if (watermark.isValid()) {
+        CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, watermark, req);
+        PartitionsSpecByExprResult r = (PartitionsSpecByExprResult) mscLocalCache.getIfPresent(cacheKey);
+        if (r == null) {
+          r = super.getPartitionsSpecByExprInternal(req);
+          mscLocalCache.put(cacheKey, r);
+        }
 
-    // table should be transactional to get responses from the cache
-    TableWatermark watermark = new TableWatermark(
-        req.getValidWriteIdList(), req.getId());
-    if (isCacheEnabledAndInitialized() && watermark.isValid()) {
-      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, req);
-      r = (PartitionsSpecByExprResult) mscLocalCache.getIfPresent(cacheKey);
-      if (r == null) {
-        r = super.getPartitionsSpecByExprInternal(req);
-        mscLocalCache.put(cacheKey, r);
-      }
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
 
-      if (LOG.isDebugEnabled() && RECORD_STATS) {
-        LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        return r;
       }
-    } else {
-      r = super.getPartitionsSpecByExprInternal(req);
     }
 
-    return r;
+    return super.getPartitionsSpecByExprInternal(req);
   }
 
   @Override
   protected TableStatsResult getTableColumnStatisticsInternal(TableStatsRequest req) throws TException {
     if (isCacheEnabledAndInitialized()) {
-      Table tbl = getTable(req.getDbName(), req.getTblName());
-      String validWriteIdList = getValidWriteIdList(TableName.getDbTable(req.getDbName(), req.getTblName()));
-      long tableId = tbl.getId();
       TableWatermark watermark = new TableWatermark(
-          validWriteIdList, tableId);
+          req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
       if (watermark.isValid()) {
         CacheWrapper cache = new CacheWrapper(mscLocalCache);
         // 1) Retrieve from the cache those ids present, gather the rest
@@ -342,17 +337,12 @@
 
   @Override
   protected AggrStats getAggrStatsForInternal(PartitionsStatsRequest req) throws TException {
-    AggrStats r;
-
     if (isCacheEnabledAndInitialized()) {
-      Table tbl = getTable(req.getDbName(), req.getTblName());
-      String validWriteIdList = getValidWriteIdList(TableName.getDbTable(req.getDbName(), req.getTblName()));
-      long tableId = tbl.getId();
       TableWatermark watermark = new TableWatermark(
-          validWriteIdList, tableId);
+          req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
       if (watermark.isValid()) {
-        CacheKey cacheKey = new CacheKey(KeyType.AGGR_COL_STATS, req, watermark);
-        r = (AggrStats) mscLocalCache.getIfPresent(cacheKey);
+        CacheKey cacheKey = new CacheKey(KeyType.AGGR_COL_STATS, watermark, req);
+        AggrStats r = (AggrStats) mscLocalCache.getIfPresent(cacheKey);
         if (r == null) {
           r = super.getAggrStatsForInternal(req);
           mscLocalCache.put(cacheKey, r);
@@ -361,25 +351,20 @@
         if (LOG.isDebugEnabled() && RECORD_STATS) {
           LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
         }
-      } else {
-        r = super.getAggrStatsForInternal(req);
+
+        return r;
       }
-    } else {
-      r = super.getAggrStatsForInternal(req);
     }
 
-    return r;
+    return super.getAggrStatsForInternal(req);
   }
 
   @Override
   protected GetPartitionsByNamesResult getPartitionsByNamesInternal(GetPartitionsByNamesRequest rqst) throws TException {
     if (isCacheEnabledAndInitialized()) {
       String dbName = parseDbName(rqst.getDb_name(), conf)[1];
-      Table tbl = getTable(dbName, rqst.getTbl_name());
-      String validWriteIdList = getValidWriteIdList(TableName.getDbTable(dbName, rqst.getTbl_name()));
-      long tableId = tbl.getId();
       TableWatermark watermark = new TableWatermark(
-          validWriteIdList, tableId);
+          rqst.getValidWriteIdList(), getTable(dbName, rqst.getTbl_name()).getId());
       if (watermark.isValid()) {
         CacheWrapper cache = new CacheWrapper(mscLocalCache);
         // 1) Retrieve from the cache those ids present, gather the rest
@@ -428,10 +413,10 @@
     List<String> colStatsMissing = new ArrayList<>();
     List<ColumnStatisticsObj> colStats = new ArrayList<>();
     for (String colName : rqst.getColNames()) {
-      CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS,
+      CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS, watermark,
           rqst.getDbName(), rqst.getTblName(), colName,
           rqst.getCatName(), rqst.getValidWriteIdList(),
-          rqst.getEngine(), rqst.getId(), watermark);
+          rqst.getEngine(), rqst.getId());
       ColumnStatisticsObj v = (ColumnStatisticsObj) cache.get(cacheKey);
       if (v == null) {
         colStatsMissing.add(colName);
@@ -451,10 +436,10 @@
       TableStatsResult r, TableStatsRequest rqst, TableWatermark watermark) {
     List<ColumnStatisticsObj> newColStats = new ArrayList<>();
     for (ColumnStatisticsObj colStat : r.getTableStats()) {
-      CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS,
+      CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS, watermark,
           rqst.getDbName(), rqst.getTblName(), colStat.getColName(),
           rqst.getCatName(), rqst.getValidWriteIdList(),
-          rqst.getEngine(), rqst.getId(), watermark);
+          rqst.getEngine(), rqst.getId());
       cache.put(cacheKey, colStat);
       newColStats.add(colStat);
     }
@@ -494,10 +479,10 @@
     List<String> partitionsMissing = new ArrayList<>();
     List<Partition> partitions = new ArrayList<>();
     for (String partitionName : rqst.getNames()) {
-      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES,
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES, watermark,
           rqst.getDb_name(), rqst.getTbl_name(), Warehouse.getPartValuesFromPartName(partitionName),
           rqst.isGet_col_stats(), rqst.getProcessorCapabilities(), rqst.getProcessorIdentifier(),
-          rqst.getEngine(), rqst.getValidWriteIdList(), watermark);
+          rqst.getEngine(), rqst.getValidWriteIdList());
       Partition v = (Partition) cache.get(cacheKey);
       if (v == null) {
         partitionsMissing.add(partitionName);
@@ -517,10 +502,10 @@
       GetPartitionsByNamesResult r, GetPartitionsByNamesRequest rqst, TableWatermark watermark) {
     List<Partition> newPartitions = new ArrayList<>();
     for (Partition partition : r.getPartitions()) {
-      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES,
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES, watermark,
           rqst.getDb_name(), rqst.getTbl_name(), partition.getValues(),
           rqst.isGet_col_stats(), rqst.getProcessorCapabilities(), rqst.getProcessorIdentifier(),
-          rqst.getEngine(), rqst.getValidWriteIdList(), watermark);
+          rqst.getEngine(), rqst.getValidWriteIdList());
       cache.put(cacheKey, partition);
       newPartitions.add(partition);
     }
@@ -562,7 +547,7 @@
     List<TableValidWriteIds> tblValidWriteIds = new ArrayList<>();
     for (String fullTableName : rqst.getFullTableNames()) {
       CacheKey cacheKey = new CacheKey(KeyType.VALID_WRITE_IDS,
-          fullTableName, rqst.getValidTxnList(), rqst.getWriteId(), -1);
+          fullTableName, rqst.getValidTxnList(), rqst.getWriteId());
       TableValidWriteIds v = (TableValidWriteIds) cache.get(cacheKey);
       if (v == null) {
         fullTableNamesMissing.add(fullTableName);
@@ -582,7 +567,7 @@
       newTblValidWriteIds.add(tableValidWriteIds);
       // Add to the cache
       CacheKey cacheKey = new CacheKey(KeyType.VALID_WRITE_IDS,
-          tableValidWriteIds.getFullTableName(), rqst.getValidTxnList(), rqst.getWriteId(), -1);
+          tableValidWriteIds.getFullTableName(), rqst.getValidTxnList(), rqst.getWriteId());
       cache.put(cacheKey, tableValidWriteIds);
     }
     return newTblValidWriteIds;