blob: 3e7401bc512713769241b9238771d2bf3df06a5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache data payload for entries of all ledgers.
*/
public class RangeEntryCacheImpl implements EntryCache {
/**
* Overhead per-entry to take into account the envelope.
*/
private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
private final RangeEntryCacheManagerImpl manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;
private volatile long estimatedEntrySize = 10 * 1024;
private final long readEntryTimeoutMillis;
private static final double MB = 1024 * 1024;
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;
if (log.isDebugEnabled()) {
log.debug("[{}] Initialized managed-ledger entry cache", ml.getName());
}
}
@VisibleForTesting
ManagedLedgerConfig getManagedLedgerConfig() {
return ml.getConfig();
}
@Override
public String getName() {
return ml.getName();
}
@VisibleForTesting
InflightReadsLimiter getPendingReadsLimiter() {
return manager.getInflightReadsLimiter();
}
public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, // preferDirect
0, // nHeapArenas,
PooledByteBufAllocator.defaultNumDirectArena(), // nDirectArena
PooledByteBufAllocator.defaultPageSize(), // pageSize
PooledByteBufAllocator.defaultMaxOrder(), // maxOrder
PooledByteBufAllocator.defaultSmallCacheSize(), // smallCacheSize
PooledByteBufAllocator.defaultNormalCacheSize(), // normalCacheSize,
true // Use cache for all threads
);
@Override
public boolean insert(EntryImpl entry) {
if (!manager.hasSpaceInCache()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping cache while doing eviction: {} - size: {}", ml.getName(), entry.getPosition(),
entry.getLength());
}
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Adding entry to cache: {} - size: {}", ml.getName(), entry.getPosition(),
entry.getLength());
}
ByteBuf cachedData;
if (copyEntries) {
cachedData = copyEntry(entry);
if (cachedData == null) {
return false;
}
} else {
// Use retain here to have the same counter increase as in the copy entry scenario
cachedData = entry.getDataBuffer().retain();
}
PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
manager.entryAdded(entry.getLength());
return true;
} else {
// entry was not inserted into cache, we need to discard it
cacheEntry.release();
return false;
}
}
private ByteBuf copyEntry(EntryImpl entry) {
// Copy the entry into a buffer owned by the cache. The reason is that the incoming entry is retaining a buffer
// from netty, usually allocated in 64Kb chunks. So if we just retain the entry without copying it, we might
// retain actually the full 64Kb even for a small entry
int size = entry.getLength();
ByteBuf cachedData = null;
try {
cachedData = ALLOCATOR.directBuffer(size, size);
} catch (Throwable t) {
log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage());
return null;
}
if (size > 0) {
ByteBuf entryBuf = entry.getDataBuffer();
int readerIdx = entryBuf.readerIndex();
cachedData.writeBytes(entryBuf);
entryBuf.readerIndex(readerIdx);
}
return cachedData;
}
@Override
public void invalidateEntries(final PositionImpl lastPosition) {
final PositionImpl firstPosition = PositionImpl.get(-1, 0);
if (firstPosition.compareTo(lastPosition) > 0) {
if (log.isDebugEnabled()) {
log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}",
firstPosition, lastPosition);
}
return;
}
Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
if (log.isTraceEnabled()) {
log.trace("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", ml.getName(),
lastPosition, entriesRemoved, sizeRemoved);
}
manager.entriesRemoved(sizeRemoved, entriesRemoved);
}
@Override
public void invalidateAllEntries(long ledgerId) {
final PositionImpl firstPosition = PositionImpl.get(ledgerId, 0);
final PositionImpl lastPosition = PositionImpl.get(ledgerId + 1, 0);
Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
if (log.isDebugEnabled()) {
log.debug("[{}] Invalidated all entries on ledger {} - Entries removed: {} - Size removed: {}",
ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
}
manager.entriesRemoved(sizeRemoved, entriesRemoved);
}
@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback,
final Object ctx) {
try {
asyncReadEntry0(lh, position, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}-{}", lh.getId(), position, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
// (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from
// the bookie)
invalidateAllEntries(lh.getId());
callback.readEntryFailed(createManagedLedgerException(t), ctx);
}
}
private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback,
final Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId());
}
EntryImpl entry = entries.get(position);
if (entry != null) {
EntryImpl cachedEntry = EntryImpl.create(entry);
entry.release();
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
// got an empty sequence
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
});
}
}
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
try {
asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
// (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from
// the bookie)
invalidateAllEntries(lh.getId());
callback.readEntriesFailed(createManagedLedgerException(t), ctx);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null);
}
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
final AsyncCallbacks.ReadEntriesCallback callback =
handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader,
originalCallback, ctx, handle);
if (callback == null) {
return;
}
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
final PositionImpl lastPosition = PositionImpl.get(lh.getId(), lastEntry);
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry);
}
Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);
if (cachedEntries.size() == entriesToRead) {
long totalCachedSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
// All entries found in cache
for (EntryImpl entry : cachedEntries) {
entriesToReturn.add(EntryImpl.create(entry));
totalCachedSize += entry.getLength();
entry.release();
}
manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry,
lastEntry);
}
callback.readEntriesComplete((List) entriesToReturn, ctx);
} else {
if (!cachedEntries.isEmpty()) {
cachedEntries.forEach(entry -> entry.release());
}
// Read all the entries from bookkeeper
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
checkNotNull(ml.getName());
checkNotNull(ml.getExecutor());
try {
// We got the entries, we need to transform them to a List<> type
long totalSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
entriesToReturn.add(entry);
totalSize += entry.getLength();
}
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
callback.readEntriesComplete((List) entriesToReturn, ctx);
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
if (exception instanceof BKException
&& ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
ml.invalidateLedgerHandle(lh);
ManagedLedgerException mlException = createManagedLedgerException(exception);
callback.readEntriesFailed(mlException, ctx);
}
return null;
});
}
}
private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
long firstEntry, long lastEntry,
boolean shouldCacheEntry,
AsyncCallbacks.ReadEntriesCallback
originalCallback,
Object ctx,
InflightReadsLimiter.Handle handle) {
InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
if (pendingReadsLimiter.isDisabled()) {
return originalCallback;
}
long estimatedReadSize = (1 + lastEntry - firstEntry)
* (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
final AsyncCallbacks.ReadEntriesCallback callback;
InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle);
if (!newHandle.success) {
long now = System.currentTimeMillis();
if (now - newHandle.creationTime > readEntryTimeoutMillis) {
String message = "Time-out elapsed while acquiring enough permits "
+ "on the memory limiter to read from ledger "
+ lh.getId()
+ ", " + getName()
+ ", estimated read size " + estimatedReadSize + " bytes"
+ " for " + (1 + lastEntry - firstEntry)
+ " entries (check managedLedgerMaxReadsInFlightSizeInMB)";
log.error(message);
pendingReadsLimiter.release(newHandle);
originalCallback.readEntriesFailed(
new ManagedLedgerException.TooManyRequestsException(message), ctx);
return null;
}
ml.getExecutor().submitOrdered(lh.getId(), () -> {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
originalCallback, ctx, newHandle);
return null;
});
return null;
} else {
callback = new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (!entries.isEmpty()) {
long size = entries.get(0).getLength();
estimatedEntrySize = size;
AtomicInteger remainingCount = new AtomicInteger(entries.size());
for (Entry entry : entries) {
((EntryImpl) entry).onDeallocate(() -> {
if (remainingCount.decrementAndGet() <= 0) {
pendingReadsLimiter.release(newHandle);
}
});
}
} else {
pendingReadsLimiter.release(newHandle);
}
originalCallback.readEntriesComplete(entries, ctx);
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
pendingReadsLimiter.release(newHandle);
originalCallback.readEntriesFailed(exception, ctx);
}
};
}
return callback;
}
@Override
public void clear() {
Pair<Integer, Long> removedPair = entries.clear();
manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
}
@Override
public long getSize() {
return entries.getSize();
}
@Override
public int compareTo(EntryCache other) {
return Longs.compare(getSize(), other.getSize());
}
@Override
public Pair<Integer, Long> evictEntries(long sizeToFree) {
checkArgument(sizeToFree > 0);
Pair<Integer, Long> evicted = entries.evictLeastAccessedEntries(sizeToFree);
int evictedEntries = evicted.getLeft();
long evictedSize = evicted.getRight();
if (log.isDebugEnabled()) {
log.debug(
"[{}] Doing cache eviction of at least {} Mb -- Deleted {} entries - Total size deleted: {} Mb "
+ " -- Current Size: {} Mb",
ml.getName(), sizeToFree / MB, evictedEntries, evictedSize / MB, entries.getSize() / MB);
}
manager.entriesRemoved(evictedSize, evictedEntries);
return evicted;
}
@Override
public void invalidateEntriesBeforeTimestamp(long timestamp) {
Pair<Integer, Long> evictedPair = entries.evictLEntriesBeforeTimestamp(timestamp);
manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
}
private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
}