blob: d35522dee5dfed2f2278da99324aade2c749fe7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db.cache;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
import org.apache.hadoop.hdds.annotation.InterfaceStability.Evolving;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache implementation for the table. Depending on the cache clean up policy
* this cache will be full cache or partial cache.
*
* If cache cleanup policy is set as {@link CacheCleanupPolicy#MANUAL},
* this will be a partial cache.
*
* If cache cleanup policy is set as {@link CacheCleanupPolicy#NEVER},
* this will be a full cache.
*/
@Private
@Evolving
public class TableCacheImpl<CACHEKEY extends CacheKey,
CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
public static final Logger LOG =
LoggerFactory.getLogger(TableCacheImpl.class);
private final Map<CACHEKEY, CACHEVALUE> cache;
private final NavigableSet<EpochEntry<CACHEKEY>> epochEntries;
private ExecutorService executorService;
private CacheCleanupPolicy cleanupPolicy;
public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
// As for full table cache only we need elements to be inserted in sorted
// manner, so that list will be easy. For other we can go with Hash map.
if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
cache = new ConcurrentSkipListMap<>();
} else {
cache = new ConcurrentHashMap<>();
}
epochEntries = new ConcurrentSkipListSet<>();
// Created a singleThreadExecutor, so one cleanup will be running at a
// time.
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
executorService = Executors.newSingleThreadExecutor(build);
this.cleanupPolicy = cleanupPolicy;
}
@Override
public CACHEVALUE get(CACHEKEY cachekey) {
return cache.get(cachekey);
}
@Override
public void loadInitial(CACHEKEY cacheKey, CACHEVALUE cacheValue) {
// No need to add entry to epochEntries. Adding to cache is required during
// normal put operation.
cache.put(cacheKey, cacheValue);
}
@Override
public void put(CACHEKEY cacheKey, CACHEVALUE value) {
cache.put(cacheKey, value);
epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey));
}
public void cleanup(List<Long> epochs) {
executorService.execute(() -> evictCache(epochs));
}
@Override
public int size() {
return cache.size();
}
@Override
public Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator() {
return cache.entrySet().iterator();
}
@VisibleForTesting
protected void evictCache(List<Long> epochs) {
EpochEntry<CACHEKEY> currentEntry;
final AtomicBoolean removed = new AtomicBoolean();
CACHEKEY cachekey;
long lastEpoch = epochs.get(epochs.size() - 1);
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
iterator.hasNext();) {
currentEntry = iterator.next();
cachekey = currentEntry.getCachekey();
long currentEpoch = currentEntry.getEpoch();
CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> {
if (cleanupPolicy == CacheCleanupPolicy.MANUAL) {
if (v.getEpoch() == currentEpoch && epochs.contains(v.getEpoch())) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
iterator.remove();
removed.set(true);
return null;
}
} else if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
// Remove only entries which are marked for delete.
if (v.getEpoch() == currentEpoch && epochs.contains(v.getEpoch())
&& v.getCacheValue() == null) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
removed.set(true);
iterator.remove();
return null;
}
}
return v;
}));
// If override entries, then for those epoch entries, there will be no
// entry in cache. This can occur in the case we have cleaned up the
// override cache entry, but in epoch entry it is still lying around.
// This is done to cleanup epoch entries.
if (!removed.get() && cacheValue == null) {
LOG.debug("CacheKey {} with epoch {} is removed from epochEntry for " +
"a key not existing in cache", cachekey.getCacheKey(),
currentEpoch);
iterator.remove();
} else if (currentEpoch >= lastEpoch) {
// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
break;
}
removed.set(false);
}
}
public CacheResult<CACHEVALUE> lookup(CACHEKEY cachekey) {
CACHEVALUE cachevalue = cache.get(cachekey);
if (cachevalue == null) {
if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null);
} else {
return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST,
null);
}
} else {
if (cachevalue.getCacheValue() != null) {
return new CacheResult<>(CacheResult.CacheStatus.EXISTS, cachevalue);
} else {
// When entity is marked for delete, cacheValue will be set to null.
// In that case we can return NOT_EXIST irrespective of cache cleanup
// policy.
return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null);
}
}
}
@VisibleForTesting
public Set<EpochEntry<CACHEKEY>> getEpochEntrySet() {
return epochEntries;
}
/**
* Cleanup policies for table cache.
*/
public enum CacheCleanupPolicy {
NEVER, // Cache will not be cleaned up. This mean's the table maintains
// full cache.
MANUAL // Cache will be cleaned up, once after flushing to DB. It is
// caller's responsibility to flush to DB, before calling cleanup cache.
}
}