blob: 0bf03c50ce7d072469306176772ae880ba0b4f5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db.cache;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
import org.apache.hadoop.hdds.annotation.InterfaceStability.Evolving;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache implementation for the table. Partial Table cache, where the DB state
* and cache state will not be same. Partial table cache holds entries until
* flush to DB happens.
*/
@Private
@Evolving
public class PartialTableCache<CACHEKEY extends CacheKey,
CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
public static final Logger LOG =
LoggerFactory.getLogger(PartialTableCache.class);
private final Map<CACHEKEY, CACHEVALUE> cache;
private final NavigableSet<EpochEntry<CACHEKEY>> epochEntries;
private ExecutorService executorService;
public PartialTableCache() {
// We use concurrent Hash map for O(1) lookup for get API.
// During list operation for partial cache we anyway merge between DB and
// cache state. So entries in cache does not need to be in sorted order.
// And as concurrentHashMap computeIfPresent which is used by cleanup is
// atomic operation, and ozone level locks like bucket/volume locks
// protect updating same key, here it is not required to hold cache
// level locks during update/cleanup operation.
// 1. During update, it is caller responsibility to hold volume/bucket
// locks.
// 2. During cleanup which removes entry, while request is updating cache
// that should be guarded by concurrentHashMap guaranty.
cache = new ConcurrentHashMap<>();
epochEntries = new ConcurrentSkipListSet<>();
// Created a singleThreadExecutor, so one cleanup will be running at a
// time.
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
executorService = Executors.newSingleThreadExecutor(build);
}
@Override
public CACHEVALUE get(CACHEKEY cachekey) {
return cache.get(cachekey);
}
@Override
public void loadInitial(CACHEKEY cacheKey, CACHEVALUE cacheValue) {
// Do nothing for partial table cache.
}
@Override
public void put(CACHEKEY cacheKey, CACHEVALUE value) {
cache.put(cacheKey, value);
epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey));
}
public void cleanup(List<Long> epochs) {
executorService.execute(() -> evictCache(epochs));
}
@Override
public int size() {
return cache.size();
}
@Override
public Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator() {
return cache.entrySet().iterator();
}
@VisibleForTesting
public void evictCache(List<Long> epochs) {
EpochEntry<CACHEKEY> currentEntry;
CACHEKEY cachekey;
long lastEpoch = epochs.get(epochs.size() - 1);
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
iterator.hasNext();) {
currentEntry = iterator.next();
cachekey = currentEntry.getCachekey();
long currentEpoch = currentEntry.getEpoch();
// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
if (currentEpoch > lastEpoch) {
break;
}
// As ConcurrentHashMap computeIfPresent is atomic, there is no race
// condition between cache cleanup and requests updating same cache entry.
if (epochs.contains(currentEpoch)) {
// Remove epoch entry, as the entry is there in epoch list.
iterator.remove();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return null;
}
return v;
}));
}
}
}
public CacheResult<CACHEVALUE> lookup(CACHEKEY cachekey) {
CACHEVALUE cachevalue = cache.get(cachekey);
if (cachevalue == null) {
return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST,
null);
} else {
if (cachevalue.getCacheValue() != null) {
return new CacheResult<>(CacheResult.CacheStatus.EXISTS, cachevalue);
} else {
// When entity is marked for delete, cacheValue will be set to null.
return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null);
}
}
}
@VisibleForTesting
public Set<EpochEntry<CACHEKEY>> getEpochEntrySet() {
return epochEntries;
}
}