blob: 0590d86f64bcc7638a179f98a32e8f69a164d6c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
import org.apache.hadoop.hive.metastore.api.GetDatabaseRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.GetTableResult;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
import org.apache.hadoop.hive.metastore.api.TableStatsResult;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
import org.apache.thrift.TException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
/**
* This class introduces a caching layer in HS2 for metadata for some selected query APIs. It extends
* HiveMetaStoreClient, and overrides some of its methods to add this feature.
* Its design is simple, relying on snapshot information being queried to cache and invalidate the metadata.
* It helps to reduce the time spent in compilation by using HS2 memory more effectively, and it allows to
* improve HMS throughput for multi-tenant workloads by reducing the number of calls it needs to serve.
*/
public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
private static Cache<CacheKey, Object> mscLocalCache = null;
private static boolean IS_CACHE_ENABLED;
private static long MAX_SIZE;
private static boolean RECORD_STATS;
private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
private static String cacheObjName = null;
public static synchronized void init() {
if (mscLocalCache != null) return; // init cache only once
Configuration metaConf = MetastoreConf.newMetastoreConf();
LOG.debug("Initializing local cache in HiveMetaStoreClient...");
MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
initSizeEstimator();
initCache();
LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
}
public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
this(conf, null, true);
}
public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
this(conf, hookLoader, true);
}
public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
super(conf, hookLoader, allowEmbedded);
}
private static void initSizeEstimator() {
sizeEstimator = new HashMap<>();
IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
for (KeyType e : KeyType.values()) {
for (Class<?> c : e.keyClass) {
IncrementalObjectSizeEstimator.createEstimators(c, sizeEstimator);
}
IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);
}
}
/**
* KeyType is used to differentiate the request types. More types can be added in future.
*/
public enum KeyType {
// String <-- getConfigValueInternal(String name, String defaultValue)
CONFIG_VALUE(String.class, String.class, String.class),
// Database <-- getDatabaseInternal(GetDatabaseRequest request)
DATABASE(Database.class, GetDatabaseRequest.class),
// GetTableResult <-- getTableInternal(GetTableRequest req)
TABLE(GetTableResult.class, GetTableRequest.class),
// PrimaryKeysResponse <-- getPrimaryKeysInternal(PrimaryKeysRequest req)
PRIMARY_KEYS(PrimaryKeysResponse.class, PrimaryKeysRequest.class),
// ForeignKeysResponse <-- getForeignKeysInternal(ForeignKeysRequest req)
FOREIGN_KEYS(ForeignKeysResponse.class, ForeignKeysRequest.class),
// UniqueConstraintsResponse <-- getUniqueConstraintsInternal(UniqueConstraintsRequest req)
UNIQUE_CONSTRAINTS(UniqueConstraintsResponse.class, UniqueConstraintsRequest.class),
// NotNullConstraintsResponse <-- getNotNullConstraintsInternal(NotNullConstraintsRequest req)
NOT_NULL_CONSTRAINTS(NotNullConstraintsResponse.class, NotNullConstraintsRequest.class),
// TableStatsResult <-- getTableColumnStatisticsInternal(TableStatsRequest rqst)
// Stored individually as:
// ColumnStatisticsObj <-- String dbName, String tblName, List<string> colNames,
// String catName, String validWriteIdList, String engine, long id, (TableWatermark tw ?)
TABLE_COLUMN_STATS(ColumnStatisticsObj.class, String.class, String.class, List.class,
String.class, String.class, String.class, long.class, TableWatermark.class),
// AggrStats <-- getAggrStatsForInternal(PartitionsStatsRequest req), (TableWatermark tw ?)
AGGR_COL_STATS(AggrStats.class, PartitionsStatsRequest.class, TableWatermark.class),
// PartitionsByExprResult <-- getPartitionsByExprInternal(PartitionsByExprRequest req), (TableWatermark tw ?)
PARTITIONS_BY_EXPR(PartitionsByExprResult.class, PartitionsByExprRequest.class, TableWatermark.class),
// PartitionsSpecByExprResult <-- getPartitionsSpecByExprInternal(PartitionsByExprRequest req), (TableWatermark tw ?)
PARTITIONS_SPEC_BY_EXPR(PartitionsSpecByExprResult.class, PartitionsByExprRequest.class, TableWatermark.class),
// List<String> <-- listPartitionNamesInternal(String catName, String dbName, String tableName,
// int maxParts)
LIST_PARTITIONS_ALL(List.class, String.class, String.class, String.class, int.class),
// List<String> <-- listPartitionNamesInternal(String catName, String dbName, String tableName,
// List<String> partVals, int maxParts)
LIST_PARTITIONS(List.class, String.class, String.class, String.class, List.class, int.class),
// GetPartitionNamesPsResponse <-- listPartitionNamesRequestInternal(GetPartitionNamesPsRequest req)
LIST_PARTITIONS_REQ(GetPartitionNamesPsResponse.class, GetPartitionNamesPsRequest.class),
// List<Partition> <- listPartitionsWithAuthInfoInternal(String catName, String dbName, String tableName,
// int maxParts, String userName, List<String> groupNames)
LIST_PARTITIONS_AUTH_INFO_ALL(List.class, String.class, String.class, String.class, int.class,
String.class, List.class),
// List<Partition> <- listPartitionsWithAuthInfoInternal(String catName, String dbName, String tableName,
// List<String> partialPvals, int maxParts, String userName, List<String> groupNames)
LIST_PARTITIONS_AUTH_INFO(List.class, String.class, String.class, String.class, List.class, int.class,
String.class, List.class),
// GetPartitionsPsWithAuthResponse <- listPartitionsWithAuthInfoRequestInternal(GetPartitionsPsWithAuthRequest req)
LIST_PARTITIONS_AUTH_INFO_REQ(GetPartitionsPsWithAuthResponse.class, GetPartitionsPsWithAuthRequest.class),
// GetPartitionsByNamesResult <-- getPartitionsByNamesInternal(GetPartitionsByNamesRequest gpbnr)
// Stored individually as:
// Partition <-- String db_name, String tbl_name, List<String> partValues, boolean get_col_stats,
// List<string> processorCapabilities, String processorIdentifier, String engine,
// String validWriteIdList, (TableWatermark tw ?)
PARTITIONS_BY_NAMES(Partition.class, String.class, String.class, List.class, boolean.class,
List.class, String.class, String.class, String.class, TableWatermark.class),
// GetValidWriteIdsResponse <-- getValidWriteIdsInternal(GetValidWriteIdsRequest rqst)
// Stored individually as:
// TableValidWriteIds <-- String fullTableName, String validTxnList, long writeId
VALID_WRITE_IDS(TableValidWriteIds.class, String.class, String.class, long.class);
private final List<Class<?>> keyClass;
private final Class<?> valueClass;
KeyType(Class<?> valueClass, Class<?>... keyClasses) {
this.keyClass = Collections.unmodifiableList(Arrays.asList(keyClasses));
this.valueClass = valueClass;
}
}
/**
* CacheKey objects are used as key for the cache.
*/
public static class CacheKey{
KeyType IDENTIFIER;
List<Object> obj;
public CacheKey(KeyType IDENTIFIER, Object... objs) {
this.IDENTIFIER = IDENTIFIER;
this.obj = Collections.unmodifiableList(Arrays.asList(objs));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheKey cacheKey = (CacheKey) o;
return IDENTIFIER == cacheKey.IDENTIFIER &&
Objects.equals(obj, cacheKey.obj);
}
@Override
public int hashCode() {
return Objects.hash(IDENTIFIER, obj);
}
}
private static int getWeight(CacheKey key, Object val) {
ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
int keySize = keySizeEstimator.estimate(key, sizeEstimator);
int valSize = valSizeEstimator.estimate(val, sizeEstimator);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
}
return keySize + valSize;
}
/**
* Initializes the cache
*/
private static void initCache() {
int initSize = 100;
Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
.initialCapacity(initSize)
.maximumWeight(MAX_SIZE)
.weigher(HiveMetaStoreClientWithLocalCache::getWeight)
.removalListener((key, val, cause) -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
}});
if (RECORD_STATS) {
cacheBuilder.recordStats();
}
mscLocalCache = cacheBuilder.build();
cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
}
@Override
protected PartitionsByExprResult getPartitionsByExprInternal(PartitionsByExprRequest req) throws TException {
if (isCacheEnabledAndInitialized()) {
// table should be transactional to get responses from the cache
TableWatermark watermark = new TableWatermark(
req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
if (watermark.isValid()) {
CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, watermark, req);
PartitionsByExprResult r = (PartitionsByExprResult) mscLocalCache.getIfPresent(cacheKey);
if (r == null) {
r = super.getPartitionsByExprInternal(req);
mscLocalCache.put(cacheKey, r);
}
if (LOG.isDebugEnabled() && RECORD_STATS) {
LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
}
return r;
}
}
return super.getPartitionsByExprInternal(req);
}
@Override
protected PartitionsSpecByExprResult getPartitionsSpecByExprInternal(PartitionsByExprRequest req) throws TException {
if (isCacheEnabledAndInitialized()) {
// table should be transactional to get responses from the cache
TableWatermark watermark = new TableWatermark(
req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
if (watermark.isValid()) {
CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, watermark, req);
PartitionsSpecByExprResult r = (PartitionsSpecByExprResult) mscLocalCache.getIfPresent(cacheKey);
if (r == null) {
r = super.getPartitionsSpecByExprInternal(req);
mscLocalCache.put(cacheKey, r);
}
if (LOG.isDebugEnabled() && RECORD_STATS) {
LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
}
return r;
}
}
return super.getPartitionsSpecByExprInternal(req);
}
@Override
protected TableStatsResult getTableColumnStatisticsInternal(TableStatsRequest req) throws TException {
if (isCacheEnabledAndInitialized()) {
TableWatermark watermark = new TableWatermark(
req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
if (watermark.isValid()) {
CacheWrapper cache = new CacheWrapper(mscLocalCache);
// 1) Retrieve from the cache those ids present, gather the rest
Pair<List<ColumnStatisticsObj>, List<String>> p = getTableColumnStatisticsCache(
cache, req, watermark);
List<String> colStatsMissing = p.getRight();
List<ColumnStatisticsObj> colStats = p.getLeft();
// 2) If they were all present in the cache, return
if (colStatsMissing.isEmpty()) {
return new TableStatsResult(colStats);
}
// 3) If they were not, gather the remaining
TableStatsRequest newRqst = new TableStatsRequest(req);
newRqst.setColNames(colStatsMissing);
TableStatsResult r = super.getTableColumnStatisticsInternal(newRqst);
// 4) Populate the cache
List<ColumnStatisticsObj> newColStats = loadTableColumnStatisticsCache(
cache, r, req, watermark);
// 5) Sort result (in case there is any assumption) and return
TableStatsResult result = computeTableColumnStatisticsFinal(req, colStats, newColStats);
if (LOG.isDebugEnabled() && RECORD_STATS) {
LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
}
return result;
}
}
return super.getTableColumnStatisticsInternal(req);
}
@Override
protected AggrStats getAggrStatsForInternal(PartitionsStatsRequest req) throws TException {
if (isCacheEnabledAndInitialized()) {
TableWatermark watermark = new TableWatermark(
req.getValidWriteIdList(), getTable(req.getDbName(), req.getTblName()).getId());
if (watermark.isValid()) {
CacheKey cacheKey = new CacheKey(KeyType.AGGR_COL_STATS, watermark, req);
AggrStats r = (AggrStats) mscLocalCache.getIfPresent(cacheKey);
if (r == null) {
r = super.getAggrStatsForInternal(req);
mscLocalCache.put(cacheKey, r);
}
if (LOG.isDebugEnabled() && RECORD_STATS) {
LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
}
return r;
}
}
return super.getAggrStatsForInternal(req);
}
@Override
protected GetPartitionsByNamesResult getPartitionsByNamesInternal(GetPartitionsByNamesRequest rqst) throws TException {
if (isCacheEnabledAndInitialized()) {
String dbName = parseDbName(rqst.getDb_name(), conf)[1];
TableWatermark watermark = new TableWatermark(
rqst.getValidWriteIdList(), getTable(dbName, rqst.getTbl_name()).getId());
if (watermark.isValid()) {
CacheWrapper cache = new CacheWrapper(mscLocalCache);
// 1) Retrieve from the cache those ids present, gather the rest
Pair<List<Partition>, List<String>> p = getPartitionsByNamesCache(
cache, rqst, watermark);
List<String> partitionsMissing = p.getRight();
List<Partition> partitions = p.getLeft();
// 2) If they were all present in the cache, return
if (partitionsMissing.isEmpty()) {
return new GetPartitionsByNamesResult(partitions);
}
// 3) If they were not, gather the remaining
GetPartitionsByNamesRequest newRqst = new GetPartitionsByNamesRequest(rqst);
newRqst.setNames(partitionsMissing);
GetPartitionsByNamesResult r = super.getPartitionsByNamesInternal(newRqst);
// 4) Populate the cache
List<Partition> newPartitions = loadPartitionsByNamesCache(
cache, r, rqst, watermark);
// 5) Sort result (in case there is any assumption) and return
GetPartitionsByNamesResult result = computePartitionsByNamesFinal(rqst, partitions, newPartitions);
if (LOG.isDebugEnabled() && RECORD_STATS) {
LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
}
return result;
}
}
return super.getPartitionsByNamesInternal(rqst);
}
/**
* Checks if cache is enabled and initialized
*
* @return boolean
*/
private boolean isCacheEnabledAndInitialized() {
return IS_CACHE_ENABLED && mscLocalCache != null;
}
protected final Pair<List<ColumnStatisticsObj>, List<String>> getTableColumnStatisticsCache(CacheI cache,
TableStatsRequest rqst, TableWatermark watermark) {
List<String> colStatsMissing = new ArrayList<>();
List<ColumnStatisticsObj> colStats = new ArrayList<>();
for (String colName : rqst.getColNames()) {
CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS, watermark,
rqst.getDbName(), rqst.getTblName(), colName,
rqst.getCatName(), rqst.getValidWriteIdList(),
rqst.getEngine(), rqst.getId());
ColumnStatisticsObj v = (ColumnStatisticsObj) cache.get(cacheKey);
if (v == null) {
colStatsMissing.add(colName);
} else {
if (watermark == null) {
LOG.debug("Query level HMS cache: method=getTableColumnStatisticsInternal");
} else {
LOG.debug("HS2 level HMS cache: method=getTableColumnStatisticsInternal");
}
colStats.add(v);
}
}
return Pair.of(colStats, colStatsMissing);
}
protected final List<ColumnStatisticsObj> loadTableColumnStatisticsCache(CacheI cache,
TableStatsResult r, TableStatsRequest rqst, TableWatermark watermark) {
List<ColumnStatisticsObj> newColStats = new ArrayList<>();
for (ColumnStatisticsObj colStat : r.getTableStats()) {
CacheKey cacheKey = new CacheKey(KeyType.TABLE_COLUMN_STATS, watermark,
rqst.getDbName(), rqst.getTblName(), colStat.getColName(),
rqst.getCatName(), rqst.getValidWriteIdList(),
rqst.getEngine(), rqst.getId());
cache.put(cacheKey, colStat);
newColStats.add(colStat);
}
return newColStats;
}
protected final TableStatsResult computeTableColumnStatisticsFinal(TableStatsRequest rqst,
List<ColumnStatisticsObj> colStats, List<ColumnStatisticsObj> newColStats) {
List<ColumnStatisticsObj> result = new ArrayList<>();
int i = 0, j = 0;
for (String colName : rqst.getColNames()) {
if (i >= colStats.size() || j >= newColStats.size()) {
break;
}
if (colStats.get(i).getColName().equals(colName)) {
result.add(colStats.get(i));
i++;
} else if (newColStats.get(j).getColName().equals(colName)) {
result.add(newColStats.get(j));
j++;
}
}
while (i < colStats.size()) {
result.add(colStats.get(i));
i++;
}
while (j < newColStats.size()) {
result.add(newColStats.get(j));
j++;
}
return new TableStatsResult(result);
}
protected final Pair<List<Partition>, List<String>> getPartitionsByNamesCache(CacheI cache,
GetPartitionsByNamesRequest rqst, TableWatermark watermark) throws MetaException {
List<String> partitionsMissing = new ArrayList<>();
List<Partition> partitions = new ArrayList<>();
for (String partitionName : rqst.getNames()) {
CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES, watermark,
rqst.getDb_name(), rqst.getTbl_name(), Warehouse.getPartValuesFromPartName(partitionName),
rqst.isGet_col_stats(), rqst.getProcessorCapabilities(), rqst.getProcessorIdentifier(),
rqst.getEngine(), rqst.getValidWriteIdList());
Partition v = (Partition) cache.get(cacheKey);
if (v == null) {
partitionsMissing.add(partitionName);
} else {
if (watermark == null) {
LOG.debug("Query level HMS cache: method=getPartitionsByNamesInternal");
} else {
LOG.debug("HS2 level HMS cache: method=getPartitionsByNamesInternal");
}
partitions.add(v);
}
}
return Pair.of(partitions, partitionsMissing);
}
protected final List<Partition> loadPartitionsByNamesCache(CacheI cache,
GetPartitionsByNamesResult r, GetPartitionsByNamesRequest rqst, TableWatermark watermark) {
List<Partition> newPartitions = new ArrayList<>();
for (Partition partition : r.getPartitions()) {
CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_NAMES, watermark,
rqst.getDb_name(), rqst.getTbl_name(), partition.getValues(),
rqst.isGet_col_stats(), rqst.getProcessorCapabilities(), rqst.getProcessorIdentifier(),
rqst.getEngine(), rqst.getValidWriteIdList());
cache.put(cacheKey, partition);
newPartitions.add(partition);
}
return newPartitions;
}
protected final GetPartitionsByNamesResult computePartitionsByNamesFinal(GetPartitionsByNamesRequest rqst,
List<Partition> partitions, List<Partition> newPartitions) throws MetaException {
List<Partition> result = new ArrayList<>();
int i = 0, j = 0;
for (String partitionName : rqst.getNames()) {
if (i >= partitions.size() || j >= newPartitions.size()) {
break;
}
List<String> pv = Warehouse.getPartValuesFromPartName(partitionName);
if (partitions.get(i).getValues().equals(pv)) {
result.add(partitions.get(i));
i++;
} else if (newPartitions.get(j).getValues().equals(pv)) {
result.add(newPartitions.get(j));
j++;
}
}
while (i < partitions.size()) {
result.add(partitions.get(i));
i++;
}
while (j < newPartitions.size()) {
result.add(newPartitions.get(j));
j++;
}
return new GetPartitionsByNamesResult(result);
}
protected final Pair<List<TableValidWriteIds>, List<String>> getValidWriteIdsCache(CacheI cache,
GetValidWriteIdsRequest rqst) throws TException {
List<String> fullTableNamesMissing = new ArrayList<>();
List<TableValidWriteIds> tblValidWriteIds = new ArrayList<>();
for (String fullTableName : rqst.getFullTableNames()) {
CacheKey cacheKey = new CacheKey(KeyType.VALID_WRITE_IDS,
fullTableName, rqst.getValidTxnList(), rqst.getWriteId());
TableValidWriteIds v = (TableValidWriteIds) cache.get(cacheKey);
if (v == null) {
fullTableNamesMissing.add(fullTableName);
} else {
LOG.debug("Query level HMS cache: method=getValidWriteIdsInternal");
tblValidWriteIds.add(v);
}
}
return Pair.of(tblValidWriteIds, fullTableNamesMissing);
}
protected final List<TableValidWriteIds> loadValidWriteIdsCache(CacheI cache,
GetValidWriteIdsResponse r, GetValidWriteIdsRequest rqst)
throws TException {
List<TableValidWriteIds> newTblValidWriteIds = new ArrayList<>();
for (TableValidWriteIds tableValidWriteIds : r.getTblValidWriteIds()) {
newTblValidWriteIds.add(tableValidWriteIds);
// Add to the cache
CacheKey cacheKey = new CacheKey(KeyType.VALID_WRITE_IDS,
tableValidWriteIds.getFullTableName(), rqst.getValidTxnList(), rqst.getWriteId());
cache.put(cacheKey, tableValidWriteIds);
}
return newTblValidWriteIds;
}
protected final GetValidWriteIdsResponse computeValidWriteIdsFinal(GetValidWriteIdsRequest rqst,
List<TableValidWriteIds> tblValidWriteIds, List<TableValidWriteIds> newTblValidWriteIds) {
List<TableValidWriteIds> result = new ArrayList<>();
int i = 0, j = 0;
for (String fullTableName : rqst.getFullTableNames()) {
if (i >= tblValidWriteIds.size() || j >= newTblValidWriteIds.size()) {
break;
}
if (tblValidWriteIds.get(i).getFullTableName().equals(fullTableName)) {
result.add(tblValidWriteIds.get(i));
i++;
} else if (newTblValidWriteIds.get(j).getFullTableName().equals(fullTableName)) {
result.add(newTblValidWriteIds.get(j));
j++;
}
}
while (i < tblValidWriteIds.size()) {
result.add(tblValidWriteIds.get(i));
i++;
}
while (j < newTblValidWriteIds.size()) {
result.add(newTblValidWriteIds.get(j));
j++;
}
return new GetValidWriteIdsResponse(result);
}
/**
* Wrapper to create a cache around a Caffeine Cache.
*/
protected static class CacheWrapper implements CacheI {
final Cache<CacheKey, Object> c;
protected CacheWrapper(Cache<CacheKey, Object> c) {
this.c = c;
}
@Override
public void put(Object k, Object v) {
c.put((CacheKey) k, v);
}
@Override
public Object get(Object k) {
return c.getIfPresent(k);
}
}
/**
* Cache interface.
*/
protected interface CacheI {
void put(Object k, Object v);
Object get(Object k);
}
/**
* Internal class to identify uniquely a Table.
*/
protected static class TableWatermark {
final String validWriteIdList;
final long tableId;
protected TableWatermark(String validWriteIdList, long tableId) {
this.validWriteIdList = validWriteIdList;
this.tableId = tableId;
}
public boolean isValid() {
return validWriteIdList != null && tableId != -1;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableWatermark that = (TableWatermark) o;
return tableId == that.tableId &&
Objects.equals(validWriteIdList, that.validWriteIdList);
}
@Override
public int hashCode() {
return Objects.hash(validWriteIdList, tableId);
}
}
}