blob: b2dc7b5dff6c5336c0e9c8f105da6396df65698a [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.UnboundArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
protected final static int AsyncOperationTimeoutSeconds = 30;
private final static long maxActiveCursorBacklogEntries = 100;
private static long maxMessageCacheRetentionTimeMillis = 10 * 1000;
private final BookKeeper bookKeeper;
private final String name;
private final ManagedLedgerConfig config;
private final MetaStore store;
private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>();
private final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
// Ever increasing counter of entries added
static final AtomicLongFieldUpdater<ManagedLedgerImpl> ENTRIES_ADDED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
static final AtomicLongFieldUpdater<ManagedLedgerImpl> NUMBER_OF_ENTRIES_UPDATER =
AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "numberOfEntries");
private volatile long numberOfEntries = 0;
static final AtomicLongFieldUpdater<ManagedLedgerImpl> TOTAL_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "totalSize");
private volatile long totalSize = 0;
private RateLimiter updateCursorRateLimit;
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
// This map is used for concurrent open cursor requests, where the 2nd request will attach a listener to the
// uninitialized cursor future from the 1st request
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
final EntryCache entryCache;
/**
* This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
* version, we cannot have multiple concurrent updates.
*/
private final CallbackMutex ledgersListMutex = new CallbackMutex();
private final CallbackMutex trimmerMutex = new CallbackMutex();
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
private long lastLedgerCreatedTimestamp = 0;
private long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;
private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
// Time period in which new write requests will not be accepted, after we fail in creating a new ledger.
final static long WaitTimeAfterLedgerCreationFailureMs = 10000;
volatile PositionImpl lastConfirmedEntry;
enum State {
None, // Uninitialized
LedgerOpened, // A ledger is ready to write into
ClosingLedger, // Closing current ledger
ClosedLedger, // Current ledger has been closed and there's no pending
// operation
CreatingLedger, // Creating a new ledger
Closed, // ManagedLedger has been closed
Fenced, // A managed ledger is fenced when there is some concurrent
// access from a different session/machine. In this state the
// managed ledger will throw exception for all operations, since
// the new instance will take over
}
// define boundaries for position based seeks and searches
enum PositionBound {
startIncluded, startExcluded
}
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
private volatile State state = null;
private final ScheduledExecutorService scheduledExecutor;
private final OrderedSafeExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
*/
final Queue<OpAddEntry> pendingAddEntries = new UnboundArrayBlockingQueue<>();
// //////////////////////////////////////////////////////////////////////
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedSafeExecutor orderedExecutor,
final String name) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
this.store = store;
this.name = name;
this.scheduledExecutor = scheduledExecutor;
this.executor = orderedExecutor;
TOTAL_SIZE_UPDATER.set(this, 0);
NUMBER_OF_ENTRIES_UPDATER.set(this, 0);
ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0);
STATE_UPDATER.set(this, State.None);
this.ledgersStat = null;
this.mbean = new ManagedLedgerMBeanImpl(this);
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.updateCursorRateLimit = RateLimiter.create(1);
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
log.info("Opening managed ledger {}", name);
// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
ledgersStat = stat;
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
// Last ledger stat may be zeroed, we must update it
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: ", name, id, BKException.getMessage(rc));
}
if (rc == BKException.Code.OK) {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id)
.setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength())
.setTimestamp(System.currentTimeMillis()).build();
ledgers.put(id, info);
initializeBookKeeper(callback);
} else if (rc == BKException.Code.NoSuchLedgerExistsException) {
log.warn("[{}] Ledger not found: {}", name, ledgers.lastKey());
ledgers.remove(ledgers.lastKey());
initializeBookKeeper(callback);
} else {
log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc));
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
return;
}
}));
};
if (log.isDebugEnabled()) {
log.debug("[{}] Opening legder {}", name, id);
}
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null);
} else {
initializeBookKeeper(callback);
}
}
@Override
public void operationFailed(MetaStoreException e) {
callback.initializeFailed(new ManagedLedgerException(e));
}
});
}
private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
}
// Calculate total entries and size
Iterator<LedgerInfo> iterator = ledgers.values().iterator();
while (iterator.hasNext()) {
LedgerInfo li = iterator.next();
if (li.getEntries() > 0) {
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else {
iterator.remove();
bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc);
}
}, null);
}
}
final MetaStoreCallback<Void> storeLedgersCb = new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void v, Stat stat) {
ledgersStat = stat;
initializeCursors(callback);
}
@Override
public void operationFailed(MetaStoreException e) {
callback.initializeFailed(new ManagedLedgerException(e));
}
};
// Create a new ledger to start writing
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
return;
}
log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = System.currentTimeMillis();
currentLedger = lh;
lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
ledgers.put(lh.getId(), info);
// Save it back to ensure all nodes exist
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values())
.build();
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, storeLedgersCb);
}));
}, null);
}
private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing cursors", name);
}
store.getCursors(name, new MetaStoreCallback<List<String>>() {
@Override
public void operationComplete(List<String> consumers, Stat s) {
// Load existing cursors
final AtomicInteger cursorCount = new AtomicInteger(consumers.size());
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", name, consumers.size());
}
if (consumers.isEmpty()) {
callback.initializeComplete();
return;
}
for (final String cursorName : consumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] Loading cursor {}", name, cursorName);
}
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);
cursor.recover(new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
cursors.add(cursor);
if (cursorCount.decrementAndGet() == 0) {
// The initialization is now completed, register the jmx mbean
callback.initializeComplete();
}
}
@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Recovery for cursor {} failed", name, cursorName, exception);
cursorCount.set(-1);
callback.initializeFailed(exception);
}
});
}
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to get the cursors list", name, e);
callback.initializeFailed(new ManagedLedgerException(e));
}
});
}
@Override
public String getName() {
return name;
}
@Override
public Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException {
return addEntry(data, 0, data.length);
}
@Override
public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
// Result list will contain the status exception and the resulting
// position
class Result {
ManagedLedgerException status = null;
Position position = null;
}
final Result result = new Result();
asyncAddEntry(data, offset, length, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
result.position = position;
counter.countDown();
}
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
result.status = exception;
counter.countDown();
}
}, null);
counter.await();
if (result.status != null) {
log.error("[{}] Error adding entry", name, result.status);
throw result.status;
}
return result.position;
}
@Override
public void asyncAddEntry(final byte[] data, final AddEntryCallback callback, final Object ctx) {
asyncAddEntry(data, 0, data.length, callback, ctx);
}
@Override
public void asyncAddEntry(final byte[] data, int offset, int length, final AddEntryCallback callback,
final Object ctx) {
ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length);
asyncAddEntry(buffer, callback, ctx);
}
@Override
public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
final State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
callback.addFailed(new ManagedLedgerFencedException(), ctx);
return;
} else if (state == State.Closed) {
callback.addFailed(new ManagedLedgerException("Managed ledger was already closed"), ctx);
return;
}
OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
pendingAddEntries.add(addOperation);
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
if (log.isDebugEnabled()) {
log.debug("[{}] Queue addEntry request", name);
}
} else if (state == State.ClosedLedger) {
long now = System.currentTimeMillis();
if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) {
// Deny the write request, since we haven't waited enough time since last attempt to create a new ledger
pendingAddEntries.remove(addOperation);
callback.addFailed(new ManagedLedgerException("Waiting for new ledger creation to complete"), ctx);
return;
}
// No ledger and no pending operations. Create a new ledger
if (log.isDebugEnabled()) {
log.debug("[{}] Creating a new ledger", name);
}
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx);
}
} else {
checkArgument(state == State.LedgerOpened);
// Write into lastLedger
addOperation.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += buffer.readableBytes();
if (log.isDebugEnabled()) {
log.debug("[{}] Write into current ledger lh={} entries={}", name, currentLedger.getId(),
currentLedgerEntries);
}
if (currentLedgerIsFull()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Closing current ledger lh={}", name, currentLedger.getId());
}
// This entry will be the last added to current ledger
addOperation.setCloseWhenDone(true);
STATE_UPDATER.set(this, State.ClosingLedger);
}
addOperation.initiate();
}
}
@Override
public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedCursor cursor = null;
ManagedLedgerException exception = null;
}
final Result result = new Result();
asyncOpenCursor(cursorName, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
counter.countDown();
}
@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
counter.countDown();
}
}, null);
if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during open-cursor operation");
}
if (result.exception != null) {
log.error("Error adding entry", result.exception);
throw result.exception;
}
return result.cursor;
}
@Override
public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback,
final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
} catch (ManagedLedgerException e) {
callback.openCursorFailed(e, ctx);
return;
}
if (uninitializedCursors.containsKey(cursorName)) {
uninitializedCursors.get(cursorName).thenAccept(cursor -> {
callback.openCursorComplete(cursor, ctx);
}).exceptionally(ex -> {
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
return null;
});
return;
}
ManagedCursor cachedCursor = cursors.get(cursorName);
if (cachedCursor != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor was already created {}", name, cachedCursor);
}
callback.openCursorComplete(cachedCursor, ctx);
return;
}
// Create a new one and persist it
if (log.isDebugEnabled()) {
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
cursor.initialize(getLastPosition(), new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
cursor.setActive();
// Update the ack position (ignoring entries that were written while the cursor was being created)
cursor.initializeCursorPosition(getLastPositionAndCounter());
synchronized (this) {
cursors.add(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}
callback.openCursorComplete(cursor, ctx);
}
@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to open cursor: {}", name, cursor);
synchronized (this) {
uninitializedCursors.remove(cursorName).completeExceptionally(exception);
}
callback.openCursorFailed(exception, ctx);
}
});
}
@Override
public synchronized void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback,
final Object ctx) {
final ManagedCursorImpl cursor = (ManagedCursorImpl) cursors.get(consumerName);
if (cursor == null) {
callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
return;
}
// First remove the consumer form the MetaStore. If this operation succeeds and the next one (removing the
// ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent.
store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
cursor.asyncDeleteCursorLedger();
cursors.removeCursor(consumerName);
// Redo invalidation of entries in cache
PositionImpl slowestConsumerPosition = cursors.getSlowestReaderPosition();
if (slowestConsumerPosition != null) {
if (log.isDebugEnabled()) {
log.debug("Doing cache invalidation up to {}", slowestConsumerPosition);
}
entryCache.invalidateEntries(slowestConsumerPosition);
} else {
entryCache.clear();
}
trimConsumedLedgersInBackground();
log.info("[{}] [{}] Deleted cursor", name, consumerName);
callback.deleteCursorComplete(ctx);
}
@Override
public void operationFailed(MetaStoreException e) {
callback.deleteCursorFailed(e, ctx);
}
});
}
@Override
public void deleteCursor(String name) throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
}
final Result result = new Result();
asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
counter.countDown();
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
counter.countDown();
}
}, null);
if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during delete-cursors operation");
}
if (result.exception != null) {
log.error("Deleting cursor", result.exception);
throw result.exception;
}
}
@Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
}
@Override
public Iterable<ManagedCursor> getActiveCursors() {
return activeCursors;
}
/**
* Tells whether the managed ledger has any active-cursor registered.
*
* @return true if at least a cursor exists
*/
public boolean hasActiveCursors() {
return !activeCursors.isEmpty();
}
@Override
public long getNumberOfEntries() {
return NUMBER_OF_ENTRIES_UPDATER.get(this);
}
@Override
public long getNumberOfActiveEntries() {
long totalEntries = getNumberOfEntries();
PositionImpl pos = cursors.getSlowestReaderPosition();
if (pos == null) {
// If there are no consumers, there are no active entries
return 0;
} else {
// The slowest consumer will be in the first ledger in the list. We need to subtract the entries it has
// already consumed in order to get the active entries count.
return totalEntries - (pos.getEntryId() + 1);
}
}
@Override
public long getTotalSize() {
return TOTAL_SIZE_UPDATER.get(this);
}
@Override
public void checkBackloggedCursors() {
// activate caught up cursors
cursors.forEach(cursor -> {
if (cursor.getNumberOfEntries() < maxActiveCursorBacklogEntries) {
cursor.setActive();
}
});
// deactivate backlog cursors
Iterator<ManagedCursor> cursors = activeCursors.iterator();
while (cursors.hasNext()) {
ManagedCursor cursor = cursors.next();
long backlogEntries = cursor.getNumberOfEntries();
if (backlogEntries > maxActiveCursorBacklogEntries) {
PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
readPosition = isValidPosition(readPosition) ? readPosition : getNextValidPosition(readPosition);
if (readPosition == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Couldn't find valid read position [{}] {}", name, cursor.getName(),
cursor.getReadPosition());
}
continue;
}
try {
asyncReadEntry(readPosition, new ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException e, Object ctx) {
log.warn("[{}] Failed while reading entries on [{}] {}", name, cursor.getName(),
e.getMessage(), e);
}
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageMetadata msgMetadata = null;
try {
msgMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
long msgTimeSincePublish = (System.currentTimeMillis() - msgMetadata.getPublishTime());
if (msgTimeSincePublish > maxMessageCacheRetentionTimeMillis) {
cursor.setInactive();
}
} finally {
if (msgMetadata != null) {
msgMetadata.recycle();
}
entry.release();
}
}
}, null);
} catch (Exception e) {
log.warn("[{}] Failed while reading entries from cache on [{}] {}", name, cursor.getName(),
e.getMessage(), e);
}
}
}
}
@Override
public long getEstimatedBacklogSize() {
PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();
while (true) {
if (pos == null) {
return 0;
}
long size = 0;
final long slowestConsumerLedgerId = pos.getLedgerId();
// Subtract size of ledgers that were already fully consumed but not trimmed yet
synchronized (this) {
size = getTotalSize();
size -= ledgers.values().stream().filter(li -> li.getLedgerId() < slowestConsumerLedgerId)
.mapToLong(li -> li.getSize()).sum();
}
LedgerInfo ledgerInfo = null;
synchronized (this) {
ledgerInfo = ledgers.get(pos.getLedgerId());
}
if (ledgerInfo == null) {
// ledger was removed
if (pos.compareTo(getMarkDeletePositionOfSlowestConsumer()) == 0) {
// position still has not moved
return size;
}
// retry with new slowest consumer
pos = getMarkDeletePositionOfSlowestConsumer();
continue;
}
long numEntries = pos.getEntryId();
if (ledgerInfo.getEntries() == 0) {
size -= consumedLedgerSize(currentLedgerSize, currentLedgerEntries, numEntries);
return size;
} else {
size -= consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), numEntries);
return size;
}
}
}
private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consumedEntries) {
if (ledgerEntries <= 0) {
return 0;
}
long averageSize = ledgerSize / ledgerEntries;
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
}
@Override
public void close() throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
}
final Result result = new Result();
asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
counter.countDown();
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
counter.countDown();
}
}, null);
if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during managed ledger close");
}
if (result.exception != null) {
log.error("[{}] Error closing managed ledger", name, result.exception);
throw result.exception;
}
}
@Override
public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
factory.close(this);
callback.closeFailed(new ManagedLedgerFencedException(), ctx);
return;
} else if (state == State.Closed) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring request to close a closed managed ledger", name);
}
callback.closeComplete(ctx);
return;
}
log.info("[{}] Closing managed ledger", name);
factory.close(this);
STATE_UPDATER.set(this, State.Closed);
LedgerHandle lh = currentLedger;
if (log.isDebugEnabled()) {
log.debug("[{}] Closing current writing ledger {}", name, lh.getId());
}
mbean.startDataLedgerCloseOp();
lh.asyncClose((rc, lh1, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Close complete for ledger {}: rc = {}", name, lh.getId(), rc);
}
mbean.endDataLedgerCloseOp();
if (rc != BKException.Code.OK) {
callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
return;
}
// Close all cursors in parallel
List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (ManagedCursor cursor : cursors) {
Futures.CloseFuture closeFuture = new Futures.CloseFuture();
cursor.asyncClose(closeFuture, null);
futures.add(closeFuture);
}
Futures.waitForAll(futures).thenRun(() -> {
callback.closeComplete(ctx);
}).exceptionally(exception -> {
callback.closeFailed(new ManagedLedgerException(exception), ctx);
return null;
});
}, null);
}
// //////////////////////////////////////////////////////////////////////
// Callbacks
@Override
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
ManagedLedgerException status = new ManagedLedgerException(BKException.getMessage(rc));
// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
lastLedgerCreationFailureTimestamp = System.currentTimeMillis();
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void v, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
ledgersListMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
TimeUnit.NANOSECONDS);
}
}
@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof BadVersionException) {
synchronized (ManagedLedgerImpl.this) {
log.error(
"[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger",
name);
STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
clearPendingAddEntries(e);
return;
}
}
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
// Remove the ledger, since we failed to update the list
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
if (rc1 != BKException.Code.OK) {
log.warn("[{}] Failed to delete ledger {}: {}", name, lh.getId(),
BKException.getMessage(rc1));
}
}, null);
ledgersListMutex.unlock();
synchronized (ManagedLedgerImpl.this) {
lastLedgerCreationFailureTimestamp = System.currentTimeMillis();
STATE_UPDATER.set(ManagedLedgerImpl.this, State.ClosedLedger);
clearPendingAddEntries(e);
}
}
};
updateLedgersListAfterRollover(cb);
}
}
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
if (!ledgersListMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
return;
}
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat);
}
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}
public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
}
// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries) {
op.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();
if (log.isDebugEnabled()) {
log.debug("[{}] Sending {}", name, op);
}
if (currentLedgerIsFull()) {
STATE_UPDATER.set(this, State.ClosingLedger);
op.setCloseWhenDone(true);
op.initiate();
if (log.isDebugEnabled()) {
log.debug("[{}] Stop writing into ledger {} queue={}", name, currentLedger.getId(),
pendingAddEntries.size());
}
break;
} else {
op.initiate();
}
}
}
// //////////////////////////////////////////////////////////////////////
// Private helpers
synchronized void ledgerClosed(final LedgerHandle lh) {
final State state = STATE_UPDATER.get(this);
if (state == State.ClosingLedger || state == State.LedgerOpened) {
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
// In case we get multiple write errors for different outstanding write request, we should close the ledger
// just once
return;
}
long entriesInLedger = lh.getLastAddConfirmed() + 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger has been closed id={} entries={}", name, lh.getId(), entriesInLedger);
}
if (entriesInLedger > 0) {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
ledgers.put(lh.getId(), info);
} else {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
mbean.endDataLedgerDeleteOp();
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc);
}, null);
}
trimConsumedLedgersInBackground();
if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
if (log.isDebugEnabled()) {
log.debug("[{}] Creating a new ledger", name);
}
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null);
}
}
void clearPendingAddEntries(ManagedLedgerException e) {
while (!pendingAddEntries.isEmpty()) {
OpAddEntry op = pendingAddEntries.poll();
op.data.release();
op.failed(e);
}
}
void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state == State.Fenced || state == State.Closed) {
opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx);
return;
}
long ledgerId = opReadEntry.readPosition.getLedgerId();
LedgerHandle currentLedger = this.currentLedger;
if (ledgerId == currentLedger.getId()) {
// Current writing ledger is not in the cache (since we don't want
// it to be automatically evicted), and we cannot use 2 different
// ledger handles (read & write)for the same ledger.
internalReadFromLedger(currentLedger, opReadEntry);
} else {
LedgerInfo ledgerInfo = ledgers.get(ledgerId);
if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
// Cursor is pointing to a empty ledger, there's no need to try opening it. Skip this ledger and
// move to the next one
opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() + 1, 0));
opReadEntry.checkReadCompletion();
return;
}
// Get a ledger handle to read from
getLedgerHandle(ledgerId).thenAccept(ledger -> {
internalReadFromLedger(ledger, opReadEntry);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(new ManagedLedgerException(ex), opReadEntry.ctx);
return null;
});
}
}
CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<LedgerHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
return ledgerHandle;
}
// If not present try again and create if necessary
return ledgerCache.computeIfAbsent(ledgerId, lid -> {
// Open the ledger for reading if it was not already opened
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId);
}
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
executor.submit(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give chance to reopen it later
ledgerCache.remove(ledgerId, future);
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId());
}
future.complete(lh);
}
}));
}, null);
return future;
});
}
void invalidateLedgerHandle(LedgerHandle ledgerHandle, int rc) {
long ledgerId = ledgerHandle.getId();
if (ledgerId != currentLedger.getId()) {
// remove handle from ledger cache since we got a (read) error
ledgerCache.remove(ledgerId);
if (log.isDebugEnabled()) {
log.debug("[{}] Removed ledger {} from cache (after read error: {})", name, ledgerId, rc);
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger that encountered read error {} is current ledger", name, rc);
}
}
}
void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
LedgerHandle currentLedger = this.currentLedger;
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
if (position.getLedgerId() == currentLedger.getId()) {
LedgerHandle ledger = currentLedger;
entryCache.asyncReadEntry(ledger, position, callback, ctx);
} else {
getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(new ManagedLedgerException(ex), ctx);
return null;
});
}
}
private void internalReadFromLedger(LedgerHandle ledger, OpReadEntry opReadEntry) {
// Perform the read
long firstEntry = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger;
final ManagedCursorImpl cursor = opReadEntry.cursor;
PositionImpl lastPosition = lastConfirmedEntry;
if (ledger.getId() == lastPosition.getLedgerId()) {
// For the current ledger, we only give read visibility to the last entry we have received a confirmation in
// the managed ledger layer
lastEntryInLedger = lastPosition.getEntryId();
} else {
// For other ledgers, already closed the BK lastAddConfirmed is appropriate
lastEntryInLedger = ledger.getLastAddConfirmed();
}
if (firstEntry > lastEntryInLedger) {
if (log.isDebugEnabled()) {
log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name,
ledger.getId(), lastEntryInLedger, firstEntry);
}
if (ledger.getId() != currentLedger.getId()) {
// Cursor was placed past the end of one ledger, move it to the
// beginning of the next ledger
Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
}
opReadEntry.checkReadCompletion();
return;
}
long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
if (updateCursorRateLimit.tryAcquire()) {
if (isCursorActive(cursor)) {
final PositionImpl lastReadPosition = PositionImpl.get(ledger.getId(), lastEntry);
discardEntriesFromCache(cursor, lastReadPosition);
}
}
}
@Override
public ManagedLedgerMXBean getStats() {
return mbean;
}
boolean hasMoreEntries(PositionImpl position) {
PositionImpl lastPos = lastConfirmedEntry;
boolean result = position.compareTo(lastPos) <= 0;
if (log.isDebugEnabled()) {
log.debug("[{}] hasMoreEntries: pos={} lastPos={} res={}", name, position, lastPos, result);
}
return result;
}
void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = activeCursors.cursorUpdated(cursor, newPosition);
if (pair != null) {
entryCache.invalidateEntries(pair.second);
}
}
void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
if (pair == null) {
// Cursor has been removed in the meantime
trimConsumedLedgersInBackground();
return;
}
PositionImpl previousSlowestReader = pair.first;
PositionImpl currentSlowestReader = pair.second;
if (previousSlowestReader.compareTo(currentSlowestReader) == 0) {
// The slowest consumer has not changed position. Nothing to do right now
return;
}
// Only trigger a trimming when switching to the next ledger
if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId()) {
trimConsumedLedgersInBackground();
}
}
PositionImpl startReadOperationOnLedger(PositionImpl position) {
long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (ledgerId != position.getLedgerId()) {
// The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
// to skip on the next available ledger
position = new PositionImpl(ledgerId, 0);
}
return position;
}
void notifyCursors() {
while (true) {
final ManagedCursorImpl waitingCursor = waitingCursors.poll();
if (waitingCursor == null) {
break;
}
executor.submit(safeRun(() -> waitingCursor.notifyEntriesAvailable()));
}
}
private void trimConsumedLedgersInBackground() {
executor.submitOrdered(name, safeRun(() -> {
internalTrimConsumedLedgers();
}));
}
private void scheduleDeferredTrimming() {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground()), 100, TimeUnit.MILLISECONDS);
}
private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}
/**
* Checks whether there are ledger that have been fully consumed and deletes them
*
* @throws Exception
*/
void internalTrimConsumedLedgers() {
// Ensure only one trimming operation is active
if (!trimmerMutex.tryLock()) {
scheduleDeferredTrimming();
return;
}
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
TOTAL_SIZE_UPDATER.get(this));
}
if (STATE_UPDATER.get(this) == State.Closed) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
trimmerMutex.unlock();
return;
}
long slowestReaderLedgerId = -1;
if (cursors.isEmpty()) {
// At this point the lastLedger will be pointing to the
// ledger that has just been closed, therefore the +1 to
// include lastLedger in the trimming.
slowestReaderLedgerId = currentLedger.getId() + 1;
} else {
PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
if (slowestReaderPosition != null) {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
} else {
trimmerMutex.unlock();
return;
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) {
if (log.isDebugEnabled()) {
if (!expired) {
log.debug("[{}] ledger id skipped for deletion as unexpired: {}", name, ls.getLedgerId());
}
if (!overRetentionQuota) {
log.debug("[{}] ledger id: {} skipped for deletion as size: {} under quota: {} MB", name,
ls.getLedgerId(), TOTAL_SIZE_UPDATER.get(this), config.getRetentionSizeInMB());
}
}
break;
}
ledgersToDelete.add(ls);
ledgerCache.remove(ls.getLedgerId());
}
if (ledgersToDelete.isEmpty()) {
trimmerMutex.unlock();
return;
}
if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
|| !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming();
trimmerMutex.unlock();
return;
}
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
entryCache.invalidateAllEntries(ls.getLedgerId());
}
if (log.isDebugEnabled()) {
log.debug("[{}] Updating of ledgers list after trimming", name);
}
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build();
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
ledgersListMutex.unlock();
trimmerMutex.unlock();
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> {
if (rc == BKException.Code.NoSuchLedgerExistsException) {
log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId());
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {}", name, ls.getLedgerId(),
BKException.getMessage(rc));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ls.getLedgerId());
}
}
}, null);
}
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
ledgersListMutex.unlock();
trimmerMutex.unlock();
}
});
}
}
/**
* Delete this ManagedLedger completely from the system.
*
* @throws Exception
*/
@Override
public void delete() throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
asyncDelete(new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
counter.countDown();
}
@Override
public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) {
exception.set(e);
counter.countDown();
}
}, null);
if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during managed ledger delete operation");
}
if (exception.get() != null) {
log.error("[{}] Error deleting managed ledger", name, exception.get());
throw exception.get();
}
}
@Override
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
if (cursors.isEmpty()) {
// No cursors to delete, proceed with next step
deleteAllLedgers(callback, ctx);
return;
}
AtomicReference<ManagedLedgerException> cursorDeleteException = new AtomicReference<>();
AtomicInteger cursorsToDelete = new AtomicInteger(cursors.size());
for (ManagedCursor cursor : cursors) {
asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
if (cursorsToDelete.decrementAndGet() == 0) {
if (cursorDeleteException.get() != null) {
// Some cursor failed to delete
callback.deleteLedgerFailed(cursorDeleteException.get(), ctx);
return;
}
// All cursors deleted, continue with deleting all ledgers
deleteAllLedgers(callback, ctx);
}
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to delete cursor {}", name, cursor, exception);
cursorDeleteException.compareAndSet(null, exception);
if (cursorsToDelete.decrementAndGet() == 0) {
// Trigger callback only once
callback.deleteLedgerFailed(exception, ctx);
}
}
}, null);
}
}
private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size());
if (ledgers.isEmpty()) {
// No ledgers to delete, proceed with deleting metadata
deleteMetadata(callback, ctx);
return;
}
for (LedgerInfo ls : ledgers) {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleting ledger {}", name, ls);
}
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
switch (rc) {
case BKException.Code.NoSuchLedgerExistsException:
log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId());
// Continue anyway
case BKException.Code.OK:
if (ledgersToDelete.decrementAndGet() == 0) {
// All ledgers deleted, now remove ML metadata
deleteMetadata(callback, ctx);
}
break;
default:
// Handle error
log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(),
BKException.getMessage(rc));
int toDelete = ledgersToDelete.get();
if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) {
// Trigger callback only once
callback.deleteLedgerFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
}
}
}, null);
}
}
private void deleteMetadata(DeleteLedgerCallback callback, Object ctx) {
store.removeManagedLedger(name, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Successfully deleted managed ledger", name);
factory.close(ManagedLedgerImpl.this);
callback.deleteLedgerComplete(ctx);
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to delete managed ledger", name, e);
factory.close(ManagedLedgerImpl.this);
callback.deleteLedgerFailed(e, ctx);
}
});
}
/**
* Get the number of entries between a contiguous range of two positions
*
* @param range
* the position range
* @return the count of entries
*/
long getNumberOfEntries(Range<PositionImpl> range) {
PositionImpl fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
PositionImpl toPosition = range.upperEndpoint();
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
// If the 2 positions are in the same ledger
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
count += fromIncluded ? 1 : 0;
count += toIncluded ? 1 : 0;
return count;
} else {
long count = 0;
// If the from & to are pointing to different ledgers, then we need to :
// 1. Add the entries in the ledger pointed by toPosition
count += toPosition.getEntryId();
count += toIncluded ? 1 : 0;
// 2. Add the entries in the ledger pointed by fromPosition
LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
if (li != null) {
count += li.getEntries() - (fromPosition.getEntryId() + 1);
count += fromIncluded ? 1 : 0;
}
// 3. Add the whole ledgers entries in between
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
.values()) {
count += ls.getEntries();
}
return count;
}
}
/**
* Get the entry position at a given distance from a given position
*
* @param startPosition
* starting position
* @param n
* number of entries to skip ahead
* @param startRange
* specifies whether or not to include the start position in calculating the distance
* @return the new position that is n entries ahead
*/
PositionImpl getPositionAfterN(final PositionImpl startPosition, long n, PositionBound startRange) {
long entriesToSkip = n;
long currentLedgerId;
long currentEntryId;
if (startRange == PositionBound.startIncluded) {
currentLedgerId = startPosition.getLedgerId();
currentEntryId = startPosition.getEntryId();
} else {
// e.g. a mark-delete position
PositionImpl nextValidPosition = getNextValidPosition(startPosition);
currentLedgerId = nextValidPosition.getLedgerId();
currentEntryId = nextValidPosition.getEntryId();
}
boolean lastLedger = false;
long totalEntriesInCurrentLedger;
while (entriesToSkip >= 0) {
// for the current ledger, the number of entries written is deduced from the lastConfirmedEntry
// for previous ledgers, LedgerInfo in ZK has the number of entries
if (currentLedgerId == currentLedger.getId()) {
lastLedger = true;
totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId() + 1;
} else {
totalEntriesInCurrentLedger = ledgers.get(currentLedgerId).getEntries();
}
long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;
if (unreadEntriesInCurrentLedger >= entriesToSkip) {
// if the current ledger has more entries than what we need to skip
// then the return position is in the same ledger
currentEntryId += entriesToSkip;
break;
} else {
// skip remaining entry from the next ledger
entriesToSkip -= unreadEntriesInCurrentLedger;
if (lastLedger) {
// there are no more ledgers, return the last position
currentEntryId = totalEntriesInCurrentLedger;
break;
} else {
currentLedgerId = ledgers.ceilingKey(currentLedgerId + 1);
currentEntryId = 0;
}
}
}
PositionImpl positionToReturn = getPreviousPosition(PositionImpl.get(currentLedgerId, currentEntryId));
if (log.isDebugEnabled()) {
log.debug("getPositionAfterN: Start position {}:{}, startIncluded: {}, Return position {}:{}",
startPosition.getLedgerId(), startPosition.getEntryId(), startRange, positionToReturn.getLedgerId(),
positionToReturn.getEntryId());
}
return positionToReturn;
}
/**
* Get the entry position that come before the specified position in the message stream, using information from the
* ledger list and each ledger entries count.
*
* @param position
* the current position
* @return the previous position
*/
PositionImpl getPreviousPosition(PositionImpl position) {
if (position.getEntryId() > 0) {
return PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1);
}
// The previous position will be the last position of an earlier ledgers
NavigableMap<Long, LedgerInfo> headMap = ledgers.headMap(position.getLedgerId(), false);
if (headMap.isEmpty()) {
// There is no previous ledger, return an invalid position in the current ledger
return PositionImpl.get(position.getLedgerId(), -1);
}
// We need to find the most recent non-empty ledger
for (long ledgerId : headMap.descendingKeySet()) {
LedgerInfo li = headMap.get(ledgerId);
if (li.getEntries() > 0) {
return PositionImpl.get(li.getLedgerId(), li.getEntries() - 1);
}
}
// in case there are only empty ledgers, we return a position in the first one
return PositionImpl.get(headMap.firstEntry().getKey(), -1);
}
/**
* Validate whether a specified position is valid for the current managed ledger.
*
* @param position
* the position to validate
* @return true if the position is valid, false otherwise
*/
boolean isValidPosition(PositionImpl position) {
PositionImpl last = lastConfirmedEntry;
if (log.isDebugEnabled()) {
log.debug("IsValid position: {} -- last: {}", position, last);
}
if (position.getEntryId() < 0) {
return false;
} else if (position.getLedgerId() > last.getLedgerId()) {
return false;
} else if (position.getLedgerId() == last.getLedgerId()) {
return position.getEntryId() <= (last.getEntryId() + 1);
} else {
// Look in the ledgers map
LedgerInfo ls = ledgers.get(position.getLedgerId());
if (ls == null) {
if (position.getLedgerId() < last.getLedgerId()) {
// Pointing to a non existing ledger that is older than the current ledger is invalid
return false;
} else {
// Pointing to a non existing ledger is only legitimate if the ledger was empty
return position.getEntryId() == 0;
}
}
return position.getEntryId() < ls.getEntries();
}
}
boolean ledgerExists(long ledgerId) {
return ledgers.get(ledgerId) != null;
}
long getNextValidLedger(long ledgerId) {
return ledgers.ceilingKey(ledgerId + 1);
}
PositionImpl getNextValidPosition(final PositionImpl position) {
PositionImpl nextPosition = position.getNext();
while (!isValidPosition(nextPosition)) {
Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1);
if (nextLedgerId == null) {
return null;
}
nextPosition = PositionImpl.get(nextLedgerId.longValue(), 0);
}
return nextPosition;
}
PositionImpl getFirstPosition() {
Long ledgerId = ledgers.firstKey();
return ledgerId == null ? null : new PositionImpl(ledgerId, -1);
}
PositionImpl getLastPosition() {
return lastConfirmedEntry;
}
@Override
public ManagedCursor getSlowestConsumer() {
return cursors.getSlowestReader();
}
PositionImpl getMarkDeletePositionOfSlowestConsumer() {
ManagedCursor slowestCursor = getSlowestConsumer();
return slowestCursor == null ? null : (PositionImpl) slowestCursor.getMarkDeletedPosition();
}
/**
* Get the last position written in the managed ledger, alongside with the associated counter
*/
Pair<PositionImpl, Long> getLastPositionAndCounter() {
PositionImpl pos;
long count;
do {
pos = lastConfirmedEntry;
count = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
// Ensure no entry was written while reading the two values
} while (pos.compareTo(lastConfirmedEntry) != 0);
return Pair.create(pos, count);
}
public void activateCursor(ManagedCursor cursor) {
if (activeCursors.get(cursor.getName()) == null) {
activeCursors.add(cursor);
}
}
public void deactivateCursor(ManagedCursor cursor) {
if (activeCursors.get(cursor.getName()) != null) {
activeCursors.removeCursor(cursor.getName());
if (activeCursors.isEmpty()) {
// cleanup cache if there is no active subscription
entryCache.clear();
} else {
// if removed subscription was the slowest subscription : update cursor and let it clear cache: till
// new slowest-cursor's read-position
discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(),
getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
}
}
}
public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
private boolean currentLedgerIsFull() {
boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
long timeSinceLedgerCreationMs = System.currentTimeMillis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs();
if (log.isDebugEnabled()) {
log.debug("Diff: {}, threshold: {} -- switch: {}",
System.currentTimeMillis() - lastLedgerCreatedTimestamp, config.getMinimumRolloverTimeMs(),
switchLedger);
}
return switchLedger;
} else {
return true;
}
} else {
return false;
}
}
public List<LedgerInfo> getLedgersInfoAsList() {
return Lists.newArrayList(ledgers.values());
}
public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}
ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}
OrderedSafeExecutor getExecutor() {
return executor;
}
/**
* Throws an exception if the managed ledger has been previously fenced
*
* @throws ManagedLedgerException
*/
private void checkFenced() throws ManagedLedgerException {
if (STATE_UPDATER.get(this) == State.Fenced) {
log.error("[{}] Attempted to use a fenced managed ledger", name);
throw new ManagedLedgerFencedException();
}
}
private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
if (STATE_UPDATER.get(this) == State.Closed) {
throw new ManagedLedgerException("ManagedLedger " + name + " has already been closed");
}
}
synchronized void setFenced() {
STATE_UPDATER.set(this, State.Fenced);
}
MetaStore getStore() {
return store;
}
ManagedLedgerConfig getConfig() {
return config;
}
static interface ManagedLedgerInitializeLedgerCallback {
public void initializeComplete();
public void initializeFailed(ManagedLedgerException e);
}
// Expose internal values for debugging purposes
public long getEntriesAddedCounter() {
return ENTRIES_ADDED_COUNTER_UPDATER.get(this);
}
public long getCurrentLedgerEntries() {
return currentLedgerEntries;
}
public long getCurrentLedgerSize() {
return currentLedgerSize;
}
public long getLastLedgerCreatedTimestamp() {
return lastLedgerCreatedTimestamp;
}
public long getLastLedgerCreationFailureTimestamp() {
return lastLedgerCreationFailureTimestamp;
}
public int getWaitingCursorsCount() {
return waitingCursors.size();
}
public int getPendingAddEntriesCount() {
return pendingAddEntries.size();
}
public PositionImpl getLastConfirmedEntry() {
return lastConfirmedEntry;
}
public String getState() {
return STATE_UPDATER.get(this).toString();
}
public ManagedLedgerMBeanImpl getMBean() {
return mbean;
}
public long getCacheSize() {
return entryCache.getSize();
}
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
}