blob: 8bc675d331f7327eeb0f4fca6ee7903b9961a5c9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.cache;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import com.google.common.annotations.VisibleForTesting;
public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
private final EvictionAwareAllocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
private Thread cleanupThread = null;
private final ConcurrentHashMap<Object, FileCache> cache =
new ConcurrentHashMap<Object, FileCache>();
private final LowLevelCachePolicy cachePolicy;
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
private final boolean doAssumeGranularBlocks;
public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks) {
this(metrics, cachePolicy, allocator, doAssumeGranularBlocks, DEFAULT_CLEANUP_INTERVAL);
}
@VisibleForTesting
LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval);
this.cachePolicy = cachePolicy;
this.allocator = allocator;
this.cleanupInterval = cleanupInterval;
this.metrics = metrics;
this.doAssumeGranularBlocks = doAssumeGranularBlocks;
}
public void init() {
if (cleanupInterval < 0) return;
cleanupThread = new CleanupThread(cleanupInterval);
cleanupThread.start();
}
@Override
public DiskRangeList getFileData(Object fileKey, DiskRangeList ranges, long baseOffset,
DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
FileCache subCache = cache.get(fileKey);
if (subCache == null || !subCache.incRef()) {
long totalMissed = ranges.getTotalLength();
metrics.incrCacheRequestedBytes(totalMissed);
if (qfCounters != null) {
qfCounters.recordCacheMiss(totalMissed);
}
if (prev != null && gotAllData != null) {
gotAllData.value = false;
}
return ranges;
}
try {
if (prev == null) {
prev = new MutateHelper(ranges);
}
if (gotAllData != null) {
gotAllData.value = true;
}
DiskRangeList current = ranges;
while (current != null) {
metrics.incrCacheRequestedBytes(current.getLength());
// We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
DiskRangeList next = current.next;
getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
current = next;
}
} finally {
subCache.decRef();
}
if (qfCounters != null) {
DiskRangeList current = prev.next;
long bytesHit = 0, bytesMissed = 0;
while (current != null) {
// This assumes no ranges passed to cache to fetch have data beforehand.
if (current.hasData()) {
bytesHit += current.getLength();
} else {
bytesMissed += current.getLength();
}
current = current.next;
}
qfCounters.recordCacheHit(bytesHit);
qfCounters.recordCacheMiss(bytesMissed);
}
return prev.next;
}
private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached,
ConcurrentSkipListMap<Long, LlapDataBuffer> cache, DiskRangeListFactory factory,
BooleanRef gotAllData) {
long absOffset = currentNotCached.getOffset() + baseOffset;
if (!doAssumeGranularBlocks) {
// This currently only happens in tests. See getFileData comment on the interface.
Long prevOffset = cache.floorKey(absOffset);
if (prevOffset != null) {
absOffset = prevOffset;
}
}
Iterator<Map.Entry<Long, LlapDataBuffer>> matches = cache.subMap(
absOffset, currentNotCached.getEnd() + baseOffset)
.entrySet().iterator();
long cacheEnd = -1;
while (matches.hasNext()) {
assert currentNotCached != null;
Map.Entry<Long, LlapDataBuffer> e = matches.next();
LlapDataBuffer buffer = e.getValue();
long requestedLength = currentNotCached.getLength();
// Lock the buffer, validate it and add to results.
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Locking {} during get", buffer);
}
if (!lockBuffer(buffer, true)) {
// If we cannot lock, remove this from cache and continue.
matches.remove();
if (gotAllData != null) {
gotAllData.value = false;
}
continue;
}
long cacheOffset = e.getKey();
if (cacheEnd > cacheOffset) { // compare with old cacheEnd
throw new AssertionError("Cache has overlapping buffers: " + cacheEnd + ") and ["
+ cacheOffset + ", " + (cacheOffset + buffer.declaredCachedLength) + ")");
}
cacheEnd = cacheOffset + buffer.declaredCachedLength;
DiskRangeList currentCached = factory.createCacheChunk(buffer,
cacheOffset - baseOffset, cacheEnd - baseOffset);
currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, gotAllData);
metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength()));
}
if (currentNotCached != null) {
assert !currentNotCached.hasData(); // Assumes no ranges passed to cache to read have data.
if (gotAllData != null) {
gotAllData.value = false;
}
}
}
/**
* Adds cached buffer to buffer list.
* @param currentNotCached Pointer to the list node where we are inserting.
* @param currentCached The cached buffer found for this node, to insert.
* @return The new currentNotCached pointer, following the cached buffer insertion.
*/
private DiskRangeList addCachedBufferToIter(
DiskRangeList currentNotCached, DiskRangeList currentCached, BooleanRef gotAllData) {
if (currentNotCached.getOffset() >= currentCached.getOffset()) {
if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now
// Replace the entire current DiskRange with new cached range.
currentNotCached.replaceSelfWith(currentCached);
return null;
} else {
// Insert the new cache range before the disk range.
currentNotCached.insertPartBefore(currentCached);
return currentNotCached;
}
} else {
// There's some part of current buffer that is not cached.
if (gotAllData != null) {
gotAllData.value = false;
}
assert currentNotCached.getOffset() < currentCached.getOffset()
|| currentNotCached.prev == null
|| currentNotCached.prev.getEnd() <= currentCached.getOffset();
long endOffset = currentNotCached.getEnd();
currentNotCached.insertPartAfter(currentCached);
if (endOffset <= currentCached.getEnd()) { // we assume it's always "==" now
return null; // No more matches expected...
} else {
// Insert the new disk range after the cache range. TODO: not strictly necessary yet?
currentNotCached = new DiskRangeList(currentCached.getEnd(), endOffset);
currentCached.insertAfter(currentNotCached);
return currentNotCached;
}
}
}
private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
int rc = buffer.incRef();
if (rc > 0) {
metrics.incrCacheNumLockedBuffers();
}
if (doNotifyPolicy && rc == 1) {
// We have just locked a buffer that wasn't previously locked.
cachePolicy.notifyLock(buffer);
}
return rc > 0;
}
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
FileCache subCache = getOrAddFileSubCache(fileKey);
try {
for (int i = 0; i < ranges.length; ++i) {
LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time", buffer);
}
boolean canLock = lockBuffer(buffer, false);
assert canLock;
long offset = ranges[i].getOffset() + baseOffset;
assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
buffer.declaredCachedLength = ranges[i].getLength();
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
if (oldVal == null) {
// Cached successfully, add to policy.
cachePolicy.cache(buffer, priority);
if (qfCounters != null) {
qfCounters.recordAllocBytes(buffer.byteBuffer.remaining(), buffer.allocSize);
}
break;
}
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when the chunk is already cached for" +
" {}@{} (base {}); old {}, new {}", fileKey, offset, baseOffset, oldVal, buffer);
}
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal);
}
if (lockBuffer(oldVal, true)) {
// We don't do proper overlap checking because it would cost cycles and we
// think it will never happen. We do perform the most basic check here.
if (oldVal.declaredCachedLength != buffer.declaredCachedLength) {
throw new RuntimeException("Found a block with different length at the same offset: "
+ oldVal.declaredCachedLength + " vs " + buffer.declaredCachedLength + " @" + offset
+ " (base " + baseOffset + ")");
}
// We found an old, valid block for this key in the cache.
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}",
buffer, oldVal);
}
unlockBuffer(buffer, false);
buffers[i] = oldVal;
if (result == null) {
result = new long[align64(buffers.length) >>> 6];
}
result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
break;
}
// We found some old value but couldn't incRef it; remove it.
subCache.cache.remove(offset, oldVal);
}
}
} finally {
subCache.decRef();
}
return result;
}
/**
* All this mess is necessary because we want to be able to remove sub-caches for fully
* evicted files. It may actually be better to have non-nested map with object keys?
*/
private FileCache getOrAddFileSubCache(Object fileKey) {
FileCache newSubCache = null;
while (true) { // Overwhelmingly executes once.
FileCache subCache = cache.get(fileKey);
if (subCache != null) {
if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
if (newSubCache == null) {
newSubCache = new FileCache();
newSubCache.incRef();
}
// Found a stale value we cannot incRef; try to replace it with new value.
if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
continue; // Someone else replaced/removed a stale value, try again.
}
// No value found.
if (newSubCache == null) {
newSubCache = new FileCache();
newSubCache.incRef();
}
FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
// Someone created one in parallel and then it went stale.
if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
// Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
}
}
private static int align64(int number) {
return ((number + 63) & ~63);
}
@Override
public void decRefBuffer(MemoryBuffer buffer) {
unlockBuffer((LlapDataBuffer)buffer, true);
}
@Override
public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
for (MemoryBuffer b : cacheBuffers) {
unlockBuffer((LlapDataBuffer)b, true);
}
}
private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
boolean isLastDecref = (buffer.decRef() == 0);
if (handleLastDecRef && isLastDecref) {
// This is kind of not pretty, but this is how we detect whether buffer was cached.
// We would always set this for lookups at put time.
if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
cachePolicy.notifyUnlock(buffer);
} else {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
}
allocator.deallocate(buffer);
}
}
metrics.decrCacheNumLockedBuffers();
}
private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
public static LlapDataBuffer allocateFake() {
LlapDataBuffer fake = new LlapDataBuffer();
fake.initialize(-1, fakeBuf, 0, 1);
return fake;
}
public final void notifyEvicted(LlapDataBuffer buffer) {
allocator.deallocateEvicted(buffer);
newEvictions.incrementAndGet();
}
private static class FileCache {
private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
// TODO: given the specific data and lookups, perhaps the nested thing should not be a map
// In fact, CSLM has slow single-threaded operation, and one file is probably often read
// by just one (or few) threads, so a much more simple DS with locking might be better.
// Let's use CSLM for now, since it's available.
private final ConcurrentSkipListMap<Long, LlapDataBuffer> cache
= new ConcurrentSkipListMap<Long, LlapDataBuffer>();
private final AtomicInteger refCount = new AtomicInteger(0);
boolean incRef() {
while (true) {
int value = refCount.get();
if (value == EVICTED_REFCOUNT) return false;
if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
assert value >= 0;
if (refCount.compareAndSet(value, value + 1)) return true;
}
}
void decRef() {
int value = refCount.decrementAndGet();
if (value < 0) {
throw new AssertionError("Unexpected refCount " + value);
}
}
boolean startEvicting() {
while (true) {
int value = refCount.get();
if (value != 1) return false;
if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
}
}
void commitEvicting() {
boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
assert result;
}
void abortEvicting() {
boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
assert result;
}
}
private final class CleanupThread extends Thread {
private final long approxCleanupIntervalSec;
public CleanupThread(long cleanupInterval) {
super("Llap low level cache cleanup thread");
this.approxCleanupIntervalSec = cleanupInterval;
setDaemon(true);
setPriority(1);
}
@Override
public void run() {
while (true) {
try {
doOneCleanupRound();
} catch (InterruptedException ex) {
LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
break;
}
}
}
private void doOneCleanupRound() throws InterruptedException {
while (true) {
int evictionsSinceLast = newEvictions.getAndSet(0);
if (evictionsSinceLast > 0) break;
synchronized (newEvictions) {
newEvictions.wait(10000);
}
}
// Duration is an estimate; if the size of the map changes, it can be very different.
long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
int leftToCheck = 0; // approximate
for (FileCache fc : cache.values()) {
leftToCheck += fc.cache.size();
}
// Iterate thru all the filecaches. This is best-effort.
// If these super-long-lived iterators affect the map in some bad way,
// we'd need to sleep once per round instead.
Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
boolean isPastEndTime = false;
while (iter.hasNext()) {
FileCache fc = iter.next().getValue();
if (!fc.incRef()) {
throw new AssertionError("Something other than cleanup is removing elements from map");
}
// Iterate thru the file cache. This is best-effort.
Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.cache.entrySet().iterator();
boolean isEmpty = true;
while (subIter.hasNext()) {
long time = -1;
isPastEndTime = isPastEndTime || ((time = System.nanoTime()) >= endTime);
Thread.sleep(((leftToCheck <= 0) || isPastEndTime)
? 1 : (endTime - time) / (1000000L * leftToCheck));
if (subIter.next().getValue().isInvalid()) {
subIter.remove();
} else {
isEmpty = false;
}
--leftToCheck;
}
if (!isEmpty) {
fc.decRef();
continue;
}
// FileCache might be empty; see if we can remove it. "tryWriteLock"
if (!fc.startEvicting()) continue;
if (fc.cache.isEmpty()) {
fc.commitEvicting();
iter.remove();
} else {
fc.abortEvicting();
}
}
}
}
@Override
public boolean incRefBuffer(MemoryBuffer buffer) {
// notifyReused implies that buffer is already locked; it's also called once for new
// buffers that are not cached yet. Don't notify cache policy.
return lockBuffer(((LlapDataBuffer)buffer), false);
}
@Override
public Allocator getAllocator() {
return allocator;
}
@Override
public String debugDumpForOom() {
StringBuilder sb = new StringBuilder("File cache state ");
for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
sb.append("\n file " + e.getKey());
for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
if (e2.getValue().incRef() < 0) continue;
try {
sb.append("\n [").append(e2.getKey()).append(", ")
.append(e2.getKey() + e2.getValue().declaredCachedLength)
.append(") => ").append(e2.getValue().toString())
.append(" alloc ").append(e2.getValue().byteBuffer.position());
} finally {
e2.getValue().decRef();
}
}
} finally {
e.getValue().decRef();
}
}
return sb.toString();
}
}