blob: 4b65d62f0eee8d5105fd64f0b290ea0a648d3608 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.mledger.impl;
import static;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedCursorImpl implements ManagedCursor {
private static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
if (e1.getLedgerId() != e2.getLedgerId()) {
return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1;
if (e1.getEntryId() != e2.getEntryId()) {
return e1.getEntryId() < e2.getEntryId() ? -1 : 1;
return 0;
protected final BookKeeper bookkeeper;
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
public static final String CURSOR_INTERNAL_PROPERTY_PREFIX = "#pulsar.internal.";
private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;
protected volatile PositionImpl markDeletePosition;
// this position is have persistent mark delete position
protected volatile PositionImpl persistentMarkDeletePosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl>
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class,
protected volatile PositionImpl inProgressMarkDeletePersistPosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
protected volatile PositionImpl readPosition;
// keeps sample of last read-position for validation and monitoring if read-position is not moving forward.
protected volatile PositionImpl statsLastReadPosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry>
LAST_MARK_DELETE_ENTRY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
MarkDeleteEntry.class, "lastMarkDeleteEntry");
protected volatile MarkDeleteEntry lastMarkDeleteEntry;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
private volatile OpReadEntry waitingReadOp = null;
public static final int FALSE = 0;
public static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress");
private volatile int resetCursorInProgress = FALSE;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_READ_OPS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps");
private volatile int pendingReadOps = 0;
private static final AtomicLongFieldUpdater<ManagedCursorImpl> MSG_CONSUMED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ManagedCursorImpl.class, "messagesConsumedCounter");
// This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
// at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each
// time a message is read or deleted.
protected volatile long messagesConsumedCounter;
// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
// Stat of the cursor z-node
// NOTE: Don't update cursorLedgerStat alone,
// please use updateCursorLedgerStat method to update cursorLedgerStat and managedCursorInfo at the same time.
private volatile Stat cursorLedgerStat;
private volatile ManagedCursorInfo managedCursorInfo;
private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
private static final RangeBoundConsumer<PositionImpl> positionRangeReverseConverter =
(position) -> new LongPairRangeSet.LongPair(position.ledgerId, position.entryId);
private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
PositionImplRecyclable position = PositionImplRecyclable.create();
position.ledgerId = key;
position.entryId = value;
position.ackSet = null;
return position;
private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private RateLimiter markDeleteLimiter;
// The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
// because of the rate limiting.
private volatile boolean isDirty = false;
private boolean alwaysInactive = false;
private static final long NO_MAX_SIZE_LIMIT = -1L;
private long entriesReadCount;
private long entriesReadSize;
private int individualDeletedMessagesSerializedSize;
private static final String COMPACTION_CURSOR_NAME = "__compaction";
private volatile boolean cacheReadEntry = false;
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;
class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
final Object ctx;
final Map<String, Long> properties;
// If the callbackGroup is set, it means this mark-delete request was done on behalf of a group of request (just
// persist the last one in the chain). In this case we need to trigger the callbacks for every request in the
// group.
List<MarkDeleteEntry> callbackGroup;
public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
MarkDeleteCallback callback, Object ctx) {
this.newPosition = newPosition; = properties;
this.callback = callback;
this.ctx = ctx;
public void triggerComplete() {
// Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
// will ensure that no race condition will happen between the next mark-delete and the switching
// operation.
if (callbackGroup != null) {
// Trigger the callback for every request in the group
for (MarkDeleteEntry e : callbackGroup) {
} else if (callback != null) {
// Only trigger the callback for the current request
public void triggerFailed(ManagedLedgerException exception) {
if (callbackGroup != null) {
for (MarkDeleteEntry e : callbackGroup) {
e.callback.markDeleteFailed(exception, e.ctx);
} else if (callback != null) {
callback.markDeleteFailed(exception, ctx);
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
private volatile int pendingMarkDeletedSubmittedCount = 0;
private volatile long lastLedgerSwitchTimestamp;
private final Clock clock;
// The last active time (Unix time, milliseconds) of the cursor
private volatile long lastActive;
public enum State {
Uninitialized, // Cursor is being initialized
NoLedger, // There is no metadata ledger open for writing
Open, // Metadata ledger is ready
SwitchingLedger, // The metadata ledger is being switched
Closing, // The managed cursor is closing
Closed // The managed cursor has been closed
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
protected volatile State state = null;
protected final ManagedCursorMXBean mbean;
public interface VoidCallback {
void operationComplete();
void operationFailed(ManagedLedgerException exception);
ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
this.bookkeeper = bookkeeper;
this.cursorProperties = Collections.emptyMap();
this.config = config;
this.ledger = ledger; = cursorName;
this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, this);
if (config.isDeletionAtBatchIndexLevelEnabled()) {
this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
} else {
this.batchDeletedIndexes = null;
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
STATE_UPDATER.set(this, State.Uninitialized);
WAITING_READ_OP_UPDATER.set(this, null);
this.clock = config.getClock();
this.lastActive = this.clock.millis();
this.lastLedgerSwitchTimestamp = this.clock.millis();
if (config.getThrottleMarkDelete() > 0.0) {
markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete());
} else {
// Disable mark-delete rate limiter
markDeleteLimiter = null;
this.mbean = new ManagedCursorMXBeanImpl(this);
private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
this.managedCursorInfo = cursorInfo;
this.cursorLedgerStat = stat;
public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? : Collections.emptyMap();
public Map<String, String> getCursorProperties() {
return cursorProperties;
private CompletableFuture<Void> computeCursorProperties(
final Function<Map<String, String>, Map<String, String>> updateFunction) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
ManagedCursorInfo copy = ManagedCursorInfo
name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
public void operationComplete(Void result, Stat stat) {"[{}] Updated ledger cursor: {}", ledger.getName(), name);
ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);
updateCursorLedgerStat(copy, stat);
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, newProperties, e);
return updateCursorPropertiesResult;
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
Map<String, String> newProperties =
cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
// Prohibit setting of internal properties
Set<String> keys = newProperties.keySet();
for (String key : keys) {
return FutureUtil.failedFuture(new IllegalArgumentException(
"The property key can't start with " + CURSOR_INTERNAL_PROPERTY_PREFIX));
return computeCursorProperties(lastRead -> {
if (lastRead != null) {
lastRead.forEach((k, v) -> {
newProperties.put(k, v);
return newProperties;
public CompletableFuture<Void> putCursorProperty(String key, String value) {
return computeCursorProperties(lastRead -> {
Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
newProperties.put(key, value);
return newProperties;
public CompletableFuture<Void> removeCursorProperty(String key) {
return computeCursorProperties(lastRead -> {
Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
return newProperties;
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties =;
Map<String, Long> newProperties = properties == null ? new HashMap<>() : new HashMap<>(properties);
newProperties.put(key, value);
MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties,
last.callback, last.ctx);
newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;
return newLastMarkDeleteEntry;
return true;
return false;
public boolean removeProperty(String key) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties =;
if (properties != null && properties.containsKey(key)) {
return last;
return true;
return false;
* Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to
* have a new opened ledger.
void recover(final VoidCallback callback) {
// Read the meta-data ledgerId from the store"[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive);
Map<String, String> recoveredCursorProperties = Collections.emptyMap();
if (info.getCursorPropertiesCount() > 0) {
// Recover properties map
recoveredCursorProperties = new HashMap<>();
for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
StringProperty property = info.getCursorProperties(i);
recoveredCursorProperties.put(property.getName(), property.getValue());
cursorProperties = recoveredCursorProperties;
if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
PositionImpl recoveredPosition = new PositionImpl(info.getMarkDeleteLedgerId(),
if (info.getIndividualDeletedMessagesCount() > 0) {
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (info.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < info.getPropertiesCount(); i++) {
LongProperty property = info.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position"[{}] Cursor {} meta-data recover from ledger {}", ledger.getName(), name,
recoverFromLedger(info, callback);
public void operationFailed(MetaStoreException e) {
protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallback callback) {
// Read the acknowledged position from the metadata ledger, then create
// a new ledger and write the position into it
long ledgerId = info.getCursorsLedgerId();
OpenCallback openCallback = (rc, lh, ctx) -> {
if (log.isInfoEnabled()) {"[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc);
if (isBkErrorNotRecoverable(rc)) {
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
// Read the last entry in the ledger
long lastEntryInLedger = lh.getLastAddConfirmed();
if (lastEntryInLedger < 0) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(new ManagedLedgerException(e));
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (positionInfo.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
LongProperty property = positionInfo.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
if (config.isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
}, null);
try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
ledger.getName(), ledgerId, name, t);
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
try {
individualDeletedMessagesList.forEach(messageRange -> {
MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
} else {
// Store message ranges after splitting them by ledger ID
LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId());
if (lowerEndpointLedgerInfo != null) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1);
} else {
log.warn("[{}][{}] No ledger info of lower endpoint {}:{}", ledger.getName(), name,
lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId());
for (LedgerInfo li : ledger.getLedgersInfo()
.subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) {
individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(),
li.getEntries() - 1);
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
} finally {
private void recoverBatchDeletedIndexes (
List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
try {
batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> {
if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
long[] array = new long[batchDeletedIndexInfo.getDeleteSetCount()];
for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i);
batchDeletedIndexInfo.getPosition().getEntryId()), BitSetRecyclable.create().resetWords(array));
} finally {
private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (!ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {"[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
if (position.compareTo(ledger.getLastPosition()) > 0) {
log.warn("[{}] [{}] Current position {} is ahead of last position {}", ledger.getName(), name, position,
position = ledger.getLastPosition();
}"[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
this.cursorProperties = cursorProperties == null ? Collections.emptyMap() : cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
ledger.onCursorReadPositionUpdated(this, readPosition);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
this.cursorLedger = recoveredFromCursorLedger;
this.isCursorLedgerReadOnly = true;
STATE_UPDATER.set(this, State.NoLedger);
void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
persistPositionMetaStore(cursorLedger != null ? cursorLedger.getId() : -1L, position, properties,
new MetaStoreCallback<>() {
public void operationComplete(Void result, Stat stat) {
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
public void operationFailed(MetaStoreException e) {
}, false);
public List<Entry> readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException {
checkArgument(numberOfEntriesToRead > 0);
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
List<Entry> entries = null;
final Result result = new Result();
asyncReadEntries(numberOfEntriesToRead, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
result.entries = entries;
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null, PositionImpl.LATEST);
if (result.exception != null) {
throw result.exception;
return result.entries;
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx, PositionImpl maxPosition) {
asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
asyncReadEntriesWithSkip(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition, null);
public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
Entry entry = null;
final Result result = new Result();
asyncGetNthEntry(n, deletedEntries, new ReadEntryCallback() {
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
public void readEntryComplete(Entry entry, Object ctx) {
result.entry = entry;
public String toString() {
return String.format("Cursor [{}] get Nth entry", ManagedCursorImpl.this);
}, null);
counter.await(ledger.getConfig().getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
if (result.exception != null) {
throw result.exception;
return result.entry;
public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback,
Object ctx) {
checkArgument(n > 0);
if (isClosed()) {
callback.readEntryFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
PositionImpl startPosition = ledger.getNextValidPosition(markDeletePosition);
PositionImpl endPosition = ledger.getLastPosition();
if (startPosition.compareTo(endPosition) <= 0) {
long numOfEntries = getNumberOfEntries(Range.closed(startPosition, endPosition));
if (numOfEntries >= n) {
long deletedMessages = 0;
if (deletedEntries == IndividualDeletedEntries.Exclude) {
deletedMessages = getNumIndividualDeletedEntriesToSkip(n);
PositionImpl positionAfterN = ledger.getPositionAfterN(markDeletePosition, n + deletedMessages,
ledger.asyncReadEntry(positionAfterN, callback, ctx);
} else {
callback.readEntryComplete(null, ctx);
} else {
callback.readEntryComplete(null, ctx);
public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
throws InterruptedException, ManagedLedgerException {
return readEntriesOrWait(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT);
public List<Entry> readEntriesOrWait(int numberOfEntriesToRead, long maxSizeBytes)
throws InterruptedException, ManagedLedgerException {
checkArgument(numberOfEntriesToRead > 0);
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
List<Entry> entries = null;
final Result result = new Result();
asyncReadEntriesOrWait(numberOfEntriesToRead, maxSizeBytes, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
result.entries = entries;
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null, PositionImpl.LATEST);
if (result.exception != null) {
throw result.exception;
return result.entries;
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
asyncReadEntriesOrWait(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);
public void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
asyncReadEntriesWithSkipOrWait(maxEntries, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition);
public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
if (hasMoreEntries()) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition);
// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (config.getNewEntriesCheckDelayInMillis() > 0) {
.schedule(() -> checkForNewEntries(op, callback, ctx),
config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
if (!hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
// Let the managed ledger know we want to be notified whenever a new entry is published
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skip notification registering since we do have entries available",
ledger.getName(), name);
// Check again the entries count, since an entry could have been written between the time we
// checked and the time we've asked to be notified by managed ledger
if (hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Found more entries", ledger.getName(), name);
// Try to cancel the notification request
if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", ledger.getName(),
name, op.readPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] notification was already cancelled", ledger.getName(), name);
} else if (ledger.isTerminated()) {
// At this point we registered for notification and still there were no more available
// entries.
// If the managed ledger was indeed terminated, we need to notify the cursor
callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
} catch (Throwable t) {
callback.readEntriesFailed(new ManagedLedgerException(t), ctx);
public boolean isClosed() {
return state == State.Closed || state == State.Closing;
public boolean cancelPendingReadRequest() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
if (op != null) {
return op != null;
public boolean hasPendingReadRequest() {
return WAITING_READ_OP_UPDATER.get(this) != null;
public boolean hasMoreEntries() {
// If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more
// entries.
// If they are on different ledgers we have 2 cases :
// * Writer pointing to valid entry --> should return true since we have available entries
// * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader
// is
// at the last entry in the previous ledger
PositionImpl writerPosition = ledger.getLastPosition();
if (writerPosition.getEntryId() != -1) {
return readPosition.compareTo(writerPosition) <= 0;
} else {
// Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers
// are in the middle
return getNumberOfEntries() > 0;
public long getNumberOfEntries() {
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read",
ledger.getName(), name, readPosition, ledger.getLastPosition());
return 0;
} else {
return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext()));
public long getNumberOfEntriesSinceFirstNotAckedMessage() {
// sometimes for already caught up consumer: due to race condition markDeletePosition > readPosition. so,
// validate it before preparing range
PositionImpl markDeletePosition = this.markDeletePosition;
PositionImpl readPosition = this.readPosition;
return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0)
? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
: 0;
public int getTotalNonContiguousDeletedMessagesRange() {
return individualDeletedMessages.size();
public int getNonContiguousDeletedMessagesRangeSerializedSize() {
return this.individualDeletedMessagesSerializedSize;
public long getEstimatedSizeSinceMarkDeletePosition() {
return ledger.estimateBacklogFromPosition(markDeletePosition);
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}",
ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger),
messagesConsumedCounter, markDeletePosition, readPosition);
if (isPrecise) {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
return backlog;
public long getNumberOfEntriesInStorage() {
return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
public Position findNewestMatching(Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
return findNewestMatching(FindPositionConstraint.SearchActiveEntries, condition);
public CompletableFuture<ScanOutcome> scan(Optional<Position> position,
Predicate<Entry> condition,
int batchSize, long maxEntries, long timeOutMs) {
PositionImpl startPosition = (PositionImpl) position.orElseGet(
() -> ledger.getNextValidPosition(markDeletePosition));
CompletableFuture<ScanOutcome> future = new CompletableFuture<>();
OpScan op = new OpScan(this, batchSize, startPosition, condition, new ScanCallback() {
public void scanComplete(Position position, ScanOutcome scanOutcome, Object ctx) {
public void scanFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
}, null, maxEntries, timeOutMs);
return future;
public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
Position position = null;
final Result result = new Result();
asyncFindNewestMatching(constraint, condition, new FindEntryCallback() {
public void findEntryComplete(Position position, Object ctx) {
result.position = position;
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
result.exception = exception;
}, null);
if (result.exception != null) {
throw result.exception;
return result.position;
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx) {
asyncFindNewestMatching(constraint, condition, callback, ctx, false);
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
OpFindNewest op;
PositionImpl startPosition = null;
long max = 0;
switch (constraint) {
case SearchAllAvailableEntries:
startPosition = (PositionImpl) getFirstPosition();
max = ledger.getNumberOfEntries() - 1;
case SearchActiveEntries:
startPosition = ledger.getNextValidPosition(markDeletePosition);
max = getNumberOfEntriesInStorage();
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
Optional.empty(), ctx);
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
} else {
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
public void setActive() {
if (!isActive && !alwaysInactive) {
isActive = true;
public boolean isActive() {
return isActive;
public void setInactive() {
if (isActive) {
isActive = false;
public void setAlwaysInactive() {
this.alwaysInactive = true;
public Position getFirstPosition() {
Long firstLedgerId = ledger.getLedgersInfo().firstKey();
return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
protected void internalResetCursor(PositionImpl proposedReadPosition,
AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
final PositionImpl newReadPosition;
if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
newReadPosition = ledger.getFirstPosition();
} else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
newReadPosition = ledger.getLastPosition().getNext();
} else {
newReadPosition = proposedReadPosition;
}"[{}] Initiate reset readPosition from {} to {} on cursor {}", ledger.getName(), readPosition,
newReadPosition, name);
synchronized (pendingMarkDeleteOps) {
log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
ledger.getName(), newReadPosition, name);
new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);
VoidCallback finalCallback = new VoidCallback() {
public void operationComplete() {
// modify mark delete and read position since we are able to persist new position for cursor
try {
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
} else {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
markDeletePosition = newMarkDeletePosition;
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null, null);
if (config.isDeletionAtBatchIndexLevelEnabled()) {
long[] resetWords = newReadPosition.ackSet;
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newReadPosition, ackSet);
PositionImpl oldReadPosition = readPosition;
if (oldReadPosition.compareTo(newReadPosition) >= 0) {"[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
ledger.getName(), newReadPosition, oldReadPosition, name);
} else {"[{}] reset readPosition to {} skipping from current read readPosition {} on "
+ "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
public void operationFailed(ManagedLedgerException exception) {
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
"unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) {
checkArgument(newPos instanceof PositionImpl);
final PositionImpl newPosition = (PositionImpl) newPos;
// order trim and reset operations on a ledger
ledger.getExecutor().execute(() -> {
PositionImpl actualPosition = newPosition;
if (!ledger.isValidPosition(actualPosition)
&& !actualPosition.equals(PositionImpl.EARLIEST)
&& !actualPosition.equals(PositionImpl.LATEST)
&& !forceReset) {
actualPosition = ledger.getNextValidPosition(actualPosition);
if (actualPosition == null) {
// next valid position would only return null when newPos
// is larger than all available positions, then it's latest in effect.
actualPosition = PositionImpl.LATEST;
internalResetCursor(actualPosition, callback);
public void resetCursor(Position newPos) throws ManagedLedgerException, InterruptedException {
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch counter = new CountDownLatch(1);
asyncResetCursor(newPos, false, new AsyncCallbacks.ResetCursorCallback() {
public void resetComplete(Object ctx) {
public void resetFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
if (result.exception != null) {
log.warn("[{}] Reset cursor to {} on cursor {} timed out with exception {}", ledger.getName(), newPos,
name, result.exception);
throw new ManagedLedgerException("Timeout during reset cursor");
if (result.exception != null) {
throw result.exception;
public List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
List<Entry> entries = null;
final Result result = new Result();
asyncReplayEntries(positions, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
result.entries = entries;
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null);
if (result.exception != null) {
throw result.exception;
return result.entries;
* Async replays given positions: a. before reading it filters out already-acked messages b. reads remaining entries
* async and gives it to given ReadEntriesCallback c. returns all already-acked messages which are not replayed so,
* those messages can be removed by caller(Dispatcher)'s replay-list and it won't try to replay it again
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx) {
return asyncReplayEntries(positions, callback, ctx, false);
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
if (positions.isEmpty()) {
callback.readEntriesComplete(entries, ctx);
return Collections.emptySet();
// filters out messages which are already acknowledged
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
try {
.filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()))
} finally {
final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
ReadEntryCallback cb = new ReadEntryCallback() {
int pendingCallbacks = totalValidPositions;
public synchronized void readEntryComplete(Entry entry, Object ctx) {
if (exception.get() != null) {
// if there is already a failure for a different position, we should release the entry straight away
// and not add it to the list
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
} else {
if (--pendingCallbacks == 0) {
if (sortEntries) {
callback.readEntriesComplete(entries, ctx);
public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx) {
log.warn("[{}][{}] Error while replaying entries", ledger.getName(), name, mle);
if (exception.compareAndSet(null, mle)) {
// release the entries just once, any further read success will release the entry straight away
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
public String toString() {
return String.format("Cursor [{}] async replay entries", ManagedCursorImpl.this);
}; -> !alreadyAcknowledgedPositions.contains(position))
.forEach(p ->{
if (((PositionImpl) p).compareTo(this.readPosition) == 0) {
log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPosition",
ledger.getName(), name, p, this.readPosition);
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
return alreadyAcknowledgedPositions;
protected long getNumberOfEntries(Range<PositionImpl> range) {
long allEntries = ledger.getNumberOfEntries(range);
if (log.isDebugEnabled()) {
log.debug("[{}] getNumberOfEntries. {} allEntries: {}", ledger.getName(), range, allEntries);
AtomicLong deletedEntries = new AtomicLong(0);
try {
if (config.isUnackedRangesOpenCacheSetEnabled()) {
int cardinality = individualDeletedMessages.cardinality(
range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId,
range.upperEndpoint().ledgerId, range.upperEndpoint().entryId);
} else {
individualDeletedMessages.forEach((r) -> {
try {
if (r.isConnected(range)) {
Range<PositionImpl> commonEntries = r.intersection(range);
long commonCount = ledger.getNumberOfEntries(commonEntries);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Discounting {} entries for already deleted range {}",
ledger.getName(), name, commonCount, commonEntries);
return true;
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
((PositionImplRecyclable) r.upperEndpoint()).recycle();
}, recyclePositionRangeConverter);
} finally {
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} entries - deleted: {}", ledger.getName(), allEntries - deletedEntries.get(),
return allEntries - deletedEntries.get();
public void markDelete(Position position) throws InterruptedException, ManagedLedgerException {
markDelete(position, Collections.emptyMap());
public void markDelete(Position position, Map<String, Long> properties)
throws InterruptedException, ManagedLedgerException {
checkArgument(position instanceof PositionImpl);
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch counter = new CountDownLatch(1);
asyncMarkDelete(position, properties, new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null);
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during mark-delete operation");
if (result.exception != null) {
throw result.exception;
public void clearBacklog() throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch counter = new CountDownLatch(1);
asyncClearBacklog(new ClearBacklogCallback() {
public void clearBacklogComplete(Object ctx) {
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null);
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during clear backlog operation");
if (result.exception != null) {
throw result.exception;
public void asyncClearBacklog(final ClearBacklogCallback callback, Object ctx) {
asyncMarkDelete(ledger.getLastPosition(), new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause() instanceof IllegalArgumentException) {
// There could be a race condition between calling clear backlog and other mark delete operations.
// If we get an exception it means the backlog was already cleared in the meantime.
} else {
callback.clearBacklogFailed(exception, ctx);
}, ctx);
public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries)
throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch counter = new CountDownLatch(1);
asyncSkipEntries(numEntriesToSkip, deletedEntries, new SkipEntriesCallback() {
public void skipEntriesComplete(Object ctx) {
public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
}, null);
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during skip messages operation");
if (result.exception != null) {
throw result.exception;
public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries,
final SkipEntriesCallback callback, Object ctx) {"[{}] Skipping {} entries on cursor {}", ledger.getName(), numEntriesToSkip, name);
long numDeletedMessages = 0;
if (deletedEntries == IndividualDeletedEntries.Exclude) {
numDeletedMessages = getNumIndividualDeletedEntriesToSkip(numEntriesToSkip);
asyncMarkDelete(ledger.getPositionAfterN(markDeletePosition, numEntriesToSkip + numDeletedMessages,
PositionBound.startExcluded), new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause() instanceof IllegalArgumentException) {
// There could be a race condition between calling clear backlog and other mark delete
// operations.
// If we get an exception it means the backlog was already cleared in the meantime.
} else {
log.error("[{}] Skip {} entries failed for cursor {}", ledger.getName(), numEntriesToSkip,
name, exception);
callback.skipEntriesFailed(exception, ctx);
}, ctx);
// required in getNumIndividualDeletedEntriesToSkip method
// since individualDeletedMessages.forEach accepts a lambda and ordinary local variables
// defined before the lambda cannot be mutated
private static class InvidualDeletedMessagesHandlingState {
long totalEntriesToSkip = 0L;
long deletedMessages = 0L;
PositionImpl startPosition;
PositionImpl endPosition;
InvidualDeletedMessagesHandlingState(PositionImpl startPosition) {
this.startPosition = startPosition;
long getNumIndividualDeletedEntriesToSkip(long numEntries) {
try {
InvidualDeletedMessagesHandlingState state = new InvidualDeletedMessagesHandlingState(markDeletePosition);
individualDeletedMessages.forEach((r) -> {
try {
state.endPosition = r.lowerEndpoint();
if (state.startPosition.compareTo(state.endPosition) <= 0) {
Range<PositionImpl> range = Range.openClosed(state.startPosition, state.endPosition);
long entries = ledger.getNumberOfEntries(range);
if (state.totalEntriesToSkip + entries >= numEntries) {
// do not process further
return false;
state.totalEntriesToSkip += entries;
state.deletedMessages += ledger.getNumberOfEntries(r);
state.startPosition = r.upperEndpoint();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] deletePosition {} moved ahead without clearing deleteMsgs {} for cursor {}",
ledger.getName(), markDeletePosition, r.lowerEndpoint(), name);
return true;
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
}, recyclePositionRangeConverter);
return state.deletedMessages;
} finally {
boolean hasMoreEntries(PositionImpl position) {
PositionImpl lastPositionInLedger = ledger.getLastPosition();
if (position.compareTo(lastPositionInLedger) <= 0) {
return getNumberOfEntries(Range.closed(position, lastPositionInLedger)) > 0;
return false;
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
ledger.onCursorReadPositionUpdated(this, readPosition);
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
messagesConsumedCounter = lastPositionCounter.getRight();
* @param newMarkDeletePosition
* the new acknowledged position
* @return the previous acknowledged position
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new MarkDeletingMarkedPosition(
"Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+ " -- attempted mark delete: " + newMarkDeletePosition);
PositionImpl oldMarkDeletePosition = markDeletePosition;
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition.getLedgerId(),
newMarkDeletePosition.getEntryId()) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
// sometime ranges are connected but belongs to different ledgers so, they are placed sequentially
// eg: (2:10..3:15] can be returned as (2:10..2:15],[3:0..3:15]. So, try to iterate over connected range and
// found the last non-connected range which gives new markDeletePosition
while (positionAfterNewMarkDelete.compareTo(ledger.lastConfirmedEntry) <= 0) {
if (individualDeletedMessages.contains(positionAfterNewMarkDelete.getLedgerId(),
positionAfterNewMarkDelete.getEntryId())) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages.rangeContaining(
positionAfterNewMarkDelete.getLedgerId(), positionAfterNewMarkDelete.getEntryId());
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
// check if next valid position is also deleted and part of the deleted-range
if (log.isDebugEnabled()) {
log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", ledger.getName(),
oldMarkDeletePosition, newMarkDeletePosition, skippedEntries);
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(this, skippedEntries);
// markDelete-position and clear out deletedMsgSet
markDeletePosition = newMarkDeletePosition;
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
READ_POSITION_UPDATER.updateAndGet(this, currentReadPosition -> {
if (currentReadPosition.compareTo(markDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
return newReadPosition;
} else {
return currentReadPosition;
return newMarkDeletePosition;
public void asyncMarkDelete(final Position position, final MarkDeleteCallback callback, final Object ctx) {
asyncMarkDelete(position, Collections.emptyMap(), callback, ctx);
private final class MarkDeletingMarkedPosition extends IllegalArgumentException {
public MarkDeletingMarkedPosition(String s) {
public void asyncMarkDelete(final Position position, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
checkArgument(position instanceof PositionImpl);
if (isClosed()) {
callback.markDeleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
if (log.isDebugEnabled()) {
log.debug("[{}] cursor reset in progress - ignoring mark delete on position [{}] for cursor [{}]",
ledger.getName(), position, name);
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
+ position.toString()),
if (log.isDebugEnabled()) {
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
PositionImpl newPosition = (PositionImpl) position;
if (config.isDeletionAtBatchIndexLevelEnabled()) {
if (newPosition.ackSet != null) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
(k, v) -> {
if (v == null) {
return givenBitSet;
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
return givenBitSet;
} else {
return v;
if (bitSetRecyclable.get() != null) {
newPosition = ledger.getPreviousPosition(newPosition);
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
} else if (newPosition.ackSet != null) {
newPosition = ledger.getPreviousPosition(newPosition);
newPosition.ackSet = null;
if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
shouldCursorMoveForward = nextValidLedger != null
&& (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
if (shouldCursorMoveForward) {"[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePosition, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(getManagedLedgerException(e), ctx);
} finally {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newPosition, properties);
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
// We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
synchronized (pendingMarkDeleteOps) {
// The state might have changed while we were waiting on the queue mutex
switch (STATE_UPDATER.get(this)) {
case Closed:
callback.markDeleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
case NoLedger:
// We need to create a new ledger to write into.
// fall through
case SwitchingLedger:
case Open:
if (PENDING_READ_OPS_UPDATER.get(this) > 0) {
// Wait until no read operation are pending
} else {
// Execute the mark delete immediately
log.error("[{}][{}] Invalid cursor state: {}", ledger.getName(), name, state);
callback.markDeleteFailed(new ManagedLedgerException("Cursor was in invalid state: " + state), ctx);
void internalMarkDelete(final MarkDeleteEntry mdEntry) {
if (persistentMarkDeletePosition != null
&& mdEntry.newPosition.compareTo(persistentMarkDeletePosition) < 0) {
if (log.isInfoEnabled()) {"Skipping updating mark delete position to {}. The persisted mark delete position {} "
+ "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
// run with executor to prevent deadlock
ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
PositionImpl inProgressLatest = INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.updateAndGet(this, current -> {
if (current != null && current.compareTo(mdEntry.newPosition) > 0) {
return current;
} else {
return mdEntry.newPosition;
// if there's a newer or equal mark delete update in progress, skip it.
if (inProgressLatest != mdEntry.newPosition) {
if (log.isInfoEnabled()) {"Skipping updating mark delete position to {}. The mark delete position update "
+ "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
// run with executor to prevent deadlock
ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
// The counter is used to mark all the pending mark-delete request that were submitted to BK and that are not
// yet finished. While we have outstanding requests we cannot close the current ledger, so the switch to new
// ledger is postponed to when the counter goes to 0.
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) {
// keep the current value since it's later then the mdEntry.newPosition
return last;
} else {
return mdEntry;
VoidCallback cb = new VoidCallback() {
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Mark delete cursor {} to position {} succeeded", ledger.getName(), name,
mdEntry.newPosition, null);
// Remove from the individual deleted messages all the entries before the new mark delete
// point.
try {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
false, PositionImpl.get(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
persistentMarkDeletePosition = mdEntry.newPosition;
} finally {
ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);
public void operationFailed(ManagedLedgerException exception) {
mdEntry.newPosition, null);
isDirty = true;
log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
ManagedCursorImpl.this, mdEntry.newPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor mark delete failed with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
persistPositionToMetaStore(mdEntry, cb);
} else {
mdEntry.callback.markDeleteFailed(new ManagedLedgerException("Create new cursor ledger failed"),
} else {
persistPositionToLedger(cursorLedger, mdEntry, cb);
public void delete(final Position position) throws InterruptedException, ManagedLedgerException {
public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
asyncDelete(Collections.singletonList(pos), callback, ctx);
public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch counter = new CountDownLatch(1);
final AtomicBoolean timeout = new AtomicBoolean(false);
asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
public void deleteComplete(Object ctx) {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteComplete at position {}",
ledger.getName(), name, positions);
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteFailed at position {}",
ledger.getName(), name, positions);
}, null);
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
log.warn("[{}] [{}] Delete operation timeout. No callback was triggered at position {}", ledger.getName(),
name, positions);
throw new ManagedLedgerException("Timeout during delete operation");
if (result.exception != null) {
throw result.exception;
public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
if (isClosed()) {
callback.deleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
PositionImpl newMarkDeletePosition = null;
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
for (Position pos : positions) {
PositionImpl position = (PositionImpl) requireNonNull(pos);
if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
if (log.isDebugEnabled()) {
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} "
+ "for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
if (position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
if (position.ackSet == null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
PositionImpl previousPosition = ledger.getPreviousPosition(position);
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
} else if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
if (givenBitSet != bitSet) {
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
position.getLedgerId(), position.getEntryId());
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
if (individualDeletedMessages.isEmpty()) {
// No changes to individually deleted messages, so nothing to do at this point
// If the lower bound of the range set is the current mark delete position, then we can trigger a new
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.firstRange();
// If the upper bound is before the mark-delete position, we need to move ahead as these
// individualDeletedMessages are now irrelevant
if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
range = individualDeletedMessages.firstRange();
if (range == null) {
// The set was completely cleaned up now
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
newMarkDeletePosition = range.upperEndpoint();
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
callback.deleteFailed(getManagedLedgerException(e), ctx);
} finally {
if (individualDeletedMessages.isEmpty()) {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
try {
Map<String, Long> properties = lastMarkDeleteEntry != null ?
: Collections.emptyMap();
internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
callback.deleteFailed(exception, ctx);
}, ctx);
} catch (Exception e) {
log.warn("[{}] [{}] Error doing asyncDelete [{}]", ledger.getName(), name, e.getMessage(), e);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
callback.deleteFailed(new ManagedLedgerException(e), ctx);
// update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
final Map<String, Long> properties) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
// keep current value, don't update
return last;
} else {
// use given properties or when missing, use the properties from the previous field value
Map<String, Long> propertiesToUse =
properties != null ? properties : (last != null ? : Collections.emptyMap());
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
* Given a list of entries, filter out the entries that have already been individually deleted.
* @param entries
* a list of entries
* @return a list of entries not containing deleted messages
List<Entry> filterReadEntries(List<Entry> entries) {
try {
Range<PositionImpl> entriesRange = Range.closed((PositionImpl) entries.get(0).getPosition(),
(PositionImpl) entries.get(entries.size() - 1).getPosition());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", ledger.getName(), name, entriesRange,
Range<PositionImpl> span = individualDeletedMessages.isEmpty() ? null : individualDeletedMessages.span();
if (span == null || !entriesRange.isConnected(span)) {
// There are no individually deleted messages in this entry list, no need to perform filtering
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No filtering needed for entries {}", ledger.getName(), name, entriesRange);
return entries;
} else {
// Remove from the entry list all the entries that were already marked for deletion
return Lists.newArrayList(Collections2.filter(entries, entry -> {
boolean includeEntry = !individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId());
if (!includeEntry) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Filtering entry at {} - already deleted", ledger.getName(), name,
return includeEntry;
} finally {
public synchronized String toString() {
return MoreObjects.toStringHelper(this)
.add("ledger", ledger.getName())
.add("name", name)
.add("ackPos", markDeletePosition)
.add("readPos", readPosition)
public String getName() {
return name;
public long getLastActive() {
return lastActive;
public void updateLastActive() {
lastActive = System.currentTimeMillis();
public boolean isDurable() {
return true;
public Position getReadPosition() {
return readPosition;
public Position getMarkDeletedPosition() {
return markDeletePosition;
public Position getPersistentMarkDeletedPosition() {
return this.persistentMarkDeletePosition;
public void rewind() {
try {
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;"[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
public void seek(Position newReadPositionInt, boolean force) {
checkArgument(newReadPositionInt instanceof PositionImpl);
PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;
try {
if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
// Make sure the newReadPosition comes after the mark delete position
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
boolean closeCursorLedger() throws BKException, InterruptedException {
if (cursorLedger != null) {
return true;
return false;
public void close() throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException exception = null;
final Result result = new Result();
final CountDownLatch latch = new CountDownLatch(1);
asyncClose(new AsyncCallbacks.CloseCallback() {
public void closeComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Successfully closed ledger for cursor {}", ledger.getName(), name);
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Closing ledger failed for cursor {}", ledger.getName(), name, exception);
result.exception = exception;
}, null);
if (!latch.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during close operation");
if (result.exception != null) {
throw result.exception;
* Persist given markDelete position to cursor-ledger or zk-metaStore based on max number of allowed unack-range
* that can be persist in zk-metastore. If current unack-range is higher than configured threshold then broker
* persists mark-delete into cursor-ledger else into zk-metastore.
* @param position
* @param properties
* @param callback
* @param ctx
void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {
if (shouldPersistUnackRangesToLedger()) {
persistPositionToLedger(cursorLedger, new MarkDeleteEntry(position, properties, null, null),
new VoidCallback() {
public void operationComplete() {"[{}][{}] Updated md-position={} into cursor-ledger {}", ledger.getName(), name,
markDeletePosition, cursorLedger.getId());
asyncCloseCursorLedger(callback, ctx);
public void operationFailed(ManagedLedgerException e) {
log.warn("[{}][{}] Failed to persist mark-delete position into cursor-ledger{}: {}",
ledger.getName(), name, cursorLedger.getId(), e.getMessage());
callback.closeFailed(e, ctx);
} else {
persistPositionMetaStore(-1, position, properties, new MetaStoreCallback<Void>() {
public void operationComplete(Void result, Stat stat) {"[{}][{}] Closed cursor at md-position={}", ledger.getName(), name, markDeletePosition);
// At this point the position had already been safely stored in the cursor z-node
public void operationFailed(MetaStoreException e) {
log.warn("[{}][{}] Failed to update cursor info when closing: {}", ledger.getName(), name,
callback.closeFailed(e, ctx);
}, true);
private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& config.getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInMetadataStore();
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
ledger.getExecutor().execute(() -> callback.operationFailed(new MetaStoreException(
new CursorAlreadyClosedException(name + " cursor already closed"))));
final Stat lastCursorLedgerStat = cursorLedgerStat;
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
.setCursorsLedgerId(cursorsLedgerId) //
.setMarkDeleteLedgerId(position.getLedgerId()) //
.setMarkDeleteEntryId(position.getEntryId()) //
.setLastActive(lastActive); //
if (persistIndividualDeletedMessageRanges) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, position);
ManagedCursorInfo cursorInfo =;
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, cursorInfo, lastCursorLedgerStat,
new MetaStoreCallback<Void>() {
public void operationComplete(Void result, Stat stat) {
updateCursorLedgerStat(cursorInfo, stat);
callback.operationComplete(result, stat);
public void operationFailed(MetaStoreException topLevelException) {
if (topLevelException instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",, name, topLevelException.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
if (ledger.mlOwnershipChecker != null) {
ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
if (t == null && hasOwnership) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<>() {
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
// fail the top level call so that the caller can retry
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
"[{}] Failed to refresh cursor metadata-version "
+ "for {} due to {}",, name,
// fail the top level call so that the caller can retry
} else {
// fail the top level call so that the caller can retry
} else {
} else {
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
boolean alreadyClosing = !trySetStateToClosing();
if (alreadyClosing) {"[{}] [{}] State is already closed", ledger.getName(), name);
new AsyncCallbacks.CloseCallback(){
public void closeComplete(Object ctx) {
STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+ " state-closing and will no longer work", ledger.getName(), name);
callback.closeFailed(exception, ctx);
}, ctx);
* Internal version of seek that doesn't do the validation check.
* @param newReadPositionInt
void setReadPosition(Position newReadPositionInt) {
checkArgument(newReadPositionInt instanceof PositionImpl);
if (this.markDeletePosition == null
|| ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
this.readPosition = (PositionImpl) newReadPositionInt;
ledger.onCursorReadPositionUpdated(this, newReadPositionInt);
* Manually acknowledge all entries in the lost ledger.
* - Since this is an uncommon event, we focus on maintainability. So we do not modify
* {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
* {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
* - This method is valid regardless of the consumer ACK type.
* - If there is a consumer ack request after this event, it will also work.
public void skipNonRecoverableLedger(final long ledgerId){
LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
if (ledgerInfo == null) {
log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+ " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
try {
for (int i = 0; i < ledgerInfo.getEntries(); i++) {
if (!individualDeletedMessages.contains(ledgerId, i)) {
asyncDelete(PositionImpl.get(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
public void deleteComplete(Object ctx) {
// ignore.
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
// The method internalMarkDelete already handled the failure operation. We only need to
// make sure the memory state is updated.
// If the broker crashed, the non-recoverable ledger will be detected again.
}, null);
} finally {
// //////////////////////////////////////////////////
void startCreatingNewMetadataLedger() {
// Change the state so that new mark-delete ops will be queued and not immediately submitted
State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger);
if (oldState == State.SwitchingLedger) {
// Ignore double request
// Check if we can immediately switch to a new metadata ledger
void createNewMetadataLedger() {
createNewMetadataLedger(new VoidCallback() {
public void operationComplete() {
// We now have a new ledger where we can write
synchronized (pendingMarkDeleteOps) {
// Resume normal mark-delete operations
STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
public void operationFailed(ManagedLedgerException exception) {
log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+ " store.", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
// Note: if the stat is NoLedger, will persist the mark deleted position to metadata store.
// Before giving up, try to persist the position in the metadata store.
* Try set {@link #state} to {@link State#Closing}.
* @return false if the {@link #state} already is {@link State#Closing} or {@link State#Closed}.
private boolean trySetStateToClosing() {
final AtomicBoolean notClosing = new AtomicBoolean(false);
STATE_UPDATER.updateAndGet(this, state -> {
switch (state){
case Closing:
case Closed: {
return state;
default: {
return State.Closing;
return notClosing.get();
private void flushPendingMarkDeletes() {
if (!pendingMarkDeleteOps.isEmpty()) {
void internalFlushPendingMarkDeletes() {
MarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast();
lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps);
void createNewMetadataLedger(final VoidCallback callback) {
doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
if (newLedgerHandle == null) {
MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
// Created the ledger, now write the last position content
persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
switchToNewLedger(newLedgerHandle, callback);
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
}).whenComplete((result, e) -> {
if (e != null) {
private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
ledger.getExecutor().execute(() -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
if (log.isDebugEnabled()) {
log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
return future;
private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
CompletableFuture<Void> future = new CompletableFuture<>();
bookkeeper.asyncDeleteLedger(ledgerHandle.getId(), (int rc, Object ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
}, null);
return future;
private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
List<LongProperty> longProperties = new ArrayList<>();
properties.forEach((name, value) -> {
LongProperty lp = LongProperty.newBuilder().setName(name).setValue(value).build();
return longProperties;
private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyList();
List<StringProperty> stringProperties = new ArrayList<>();
properties.forEach((name, value) -> {
StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
return stringProperties;
private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
try {
if (individualDeletedMessages.isEmpty()) {
this.individualDeletedMessagesSerializedSize = 0;
return Collections.emptyList();
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange
AtomicInteger acksSerializedSize = new AtomicInteger(0);
List<MessageRange> rangeList = new ArrayList<>();
individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
MLDataFormats.NestedPositionInfo lowerPosition = nestedPositionBuilder
MLDataFormats.NestedPositionInfo upperPosition = nestedPositionBuilder
MessageRange messageRange = messageRangeBuilder
return rangeList.size() <= config.getMaxUnackedRangesToPersist();
this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
return rangeList;
} finally {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
try {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry =;
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
return result;
} finally {
void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
PositionImpl position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
position, lh1.getId(), BKException.getMessage(rc));
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
// Before giving up, try to persist the position in the metadata store.
persistPositionToMetaStore(mdEntry, callback);
}, null);
void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
final PositionImpl newPosition = mdEntry.newPosition;
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
// Before giving up, try to persist the position in the metadata store
persistPositionMetaStore(-1, newPosition,, new MetaStoreCallback<Void>() {
public void operationComplete(Void result, Stat stat) {
if (log.isDebugEnabled()) {
"[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
+ " {}", ledger.getName(), name, newPosition);
public void operationFailed(MetaStoreException e) {
log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
ledger.getName(), name, e.getMessage());
}, true);
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
// calls will be serialized on one single thread
lastLedgerSwitchTimestamp = now;
return true;
} else {
return false;
void switchToNewLedger(final LedgerHandle lh, final VoidCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] Switching cursor {} to ledger {}", ledger.getName(), name, lh.getId());
persistPositionMetaStore(lh.getId(), lastMarkDeleteEntry.newPosition,,
new MetaStoreCallback<Void>() {
public void operationComplete(Void result, Stat stat) {"[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}", ledger.getName(),
name, lh.getId(), markDeletePosition, readPosition);
final LedgerHandle oldLedger = cursorLedger;
cursorLedger = lh;
isCursorLedgerReadOnly = false;
// At this point the position had already been safely markdeleted
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update cursor metadata {}", ledger.getName(), name, e);
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
deleteLedgerAsync(lh).thenRun(() -> callback.operationFailed(e));
}, false);
* @return Whether the cursor responded to the notification
void notifyEntriesAvailable() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ml notification", ledger.getName(), name);
OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, null);
if (opReadEntry != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification of new messages persisted, reading at {} -- last: {}",
ledger.getName(), name, opReadEntry.readPosition, ledger.lastConfirmedEntry);
log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
opReadEntry.readPosition = (PositionImpl) getReadPosition();
} else {
// No one is waiting to be notified. Ignore
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification but had no pending read operation", ledger.getName(), name);
void asyncCloseCursorLedger(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
LedgerHandle lh = cursorLedger;
ledger.mbean.startCursorLedgerCloseOp();"[{}] [{}] Closing metadata ledger {}", ledger.getName(), name, lh.getId());
lh.asyncClose(new CloseCallback() {
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc == BKException.Code.OK) {"[{}][{}] Closed cursor-ledger {}", ledger.getName(), name,
} else {
log.warn("[{}][{}] Failed to close cursor-ledger {}: {}", ledger.getName(), name,
cursorLedger.getId(), BKException.getMessage(rc));
callback.closeFailed(createManagedLedgerException(rc), ctx);
}, ctx);
void decrementPendingMarkDeleteCount() {
final State state = STATE_UPDATER.get(this);
if (state == State.SwitchingLedger) {
// A metadata ledger switch was pending and now we can do it since we don't have any more
// outstanding mark-delete requests
void readOperationCompleted() {
if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) {
synchronized (pendingMarkDeleteOps) {
if (STATE_UPDATER.get(this) == State.Open) {
// Flush the pending writes only if the state is open.
"[{}] read operation completed and cursor was closed. need to call any queued cursor close",
void asyncDeleteLedger(final LedgerHandle lh) {
private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
if (lh == null || retry <= 0) {
if (lh != null) {
log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(), name, lh.getId());
bookkeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(() -> asyncDeleteLedger(lh, retry - 1),
} else {"[{}][{}] Successfully closed & deleted ledger {} in cursor", ledger.getName(), name,
}, null);
void asyncDeleteCursorLedger() {
private void asyncDeleteCursorLedger(int retry) {
STATE_UPDATER.set(this, State.Closed);
if (cursorLedger == null || retry <= 0) {
if (cursorLedger != null) {
log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(), name,
bookkeeper.asyncDeleteLedger(cursorLedger.getId(), (rc, ctx) -> {
if (rc == BKException.Code.OK) {"[{}][{}] Deleted cursor ledger {}", ledger.getName(), name, cursorLedger.getId());
} else {
log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(),
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(() -> asyncDeleteCursorLedger(retry - 1),
}, null);
* return BK error codes that are considered not likely to be recoverable.
public static boolean isBkErrorNotRecoverable(int rc) {
switch (rc) {
case Code.NoSuchLedgerExistsException:
case Code.NoSuchLedgerExistsOnMetadataServerException:
case Code.ReadException:
case Code.LedgerRecoveryException:
case Code.NoSuchEntryException:
return true;
return false;
* If we fail to recover the cursor ledger, we want to still open the ML and rollback.
* @param info
private PositionImpl getRollbackPosition(ManagedCursorInfo info) {
PositionImpl firstPosition = ledger.getFirstPosition();
PositionImpl snapshottedPosition = new PositionImpl(info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
if (firstPosition == null) {
// There are no ledgers in the ML, any position is good
return snapshottedPosition;
} else if (snapshottedPosition.compareTo(firstPosition) < 0) {
// The snapshotted position might be pointing to a ledger that was already deleted
return firstPosition;
} else {
return snapshottedPosition;
// / Expose internal values for debugging purpose
public int getPendingReadOpsCount() {
return PENDING_READ_OPS_UPDATER.get(this);
public long getMessagesConsumedCounter() {
return messagesConsumedCounter;
public long getCursorLedger() {
LedgerHandle lh = cursorLedger;
return lh != null ? lh.getId() : -1;
public long getCursorLedgerLastEntry() {
LedgerHandle lh = cursorLedger;
return lh != null ? lh.getLastAddConfirmed() : -1;
public String getIndividuallyDeletedMessages() {
try {
return individualDeletedMessages.toString();
} finally {
public LongPairRangeSet<PositionImpl> getIndividuallyDeletedMessagesSet() {
return individualDeletedMessages;
public boolean isMessageDeleted(Position position) {
checkArgument(position instanceof PositionImpl);
return ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
//this method will return a copy of the position's ack set
public long[] getBatchPositionAckSet(Position position) {
if (!(position instanceof PositionImpl)) {
return null;
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.get(position);
if (bitSetRecyclable == null) {
return null;
} else {
return bitSetRecyclable.toLongArray();
} else {
return null;
* Checks given position is part of deleted-range and returns next position of upper-end as all the messages are
* deleted up to that point.
* @param position
* @return next available position
public PositionImpl getNextAvailablePosition(PositionImpl position) {
Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
if (range != null) {
PositionImpl nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext();
return position.getNext();
public Position getNextLedgerPosition(long currentLedgerId) {
Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId);
return nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, 0) : null;
public boolean isIndividuallyDeletedEntriesEmpty() {
try {
return individualDeletedMessages.isEmpty();
} finally {
public long getLastLedgerSwitchTimestamp() {
return lastLedgerSwitchTimestamp;
public String getState() {
return STATE_UPDATER.get(this).toString();
public double getThrottleMarkDelete() {
return this.markDeleteLimiter.getRate();
public void setThrottleMarkDelete(double throttleMarkDelete) {
if (throttleMarkDelete > 0.0) {
if (markDeleteLimiter == null) {
markDeleteLimiter = RateLimiter.create(throttleMarkDelete);
} else {
} else {
// Disable mark-delete rate limiter
markDeleteLimiter = null;
public ManagedLedger getManagedLedger() {
return this.ledger;
public Range<PositionImpl> getLastIndividualDeletedRange() {
return individualDeletedMessages.lastRange();
public void trimDeletedEntries(List<Entry> entries) {
entries.removeIf(entry -> {
boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0
|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId());
if (isDeleted) {
return isDeleted;
private ManagedCursorImpl cursorImpl() {
return this;
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
return null;
public ManagedCursorMXBean getStats() {
return this.mbean;
public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
void flush() {
if (!isDirty) {
isDirty = false;
asyncMarkDelete(lastMarkDeleteEntry.newPosition,, new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Flushed dirty mark-delete position", ledger.getName(), name);
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause() instanceof MarkDeletingMarkedPosition) {
// this is not actually a problem, we should not log a stacktrace"[{}][{}] Cannot flush mark-delete position: {}", ledger.getName(),
name, exception.getCause().getMessage());
} else {
log.warn("[{}][{}] Failed to flush mark-delete position", ledger.getName(), name, exception);
}, null);
public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
return Math.min(maxEntriesBasedOnSize, maxEntries);
public boolean checkAndUpdateReadPositionChanged() {
PositionImpl lastEntry = ledger.lastConfirmedEntry;
boolean isReadPositionOnTail = lastEntry == null || readPosition == null
|| (lastEntry.compareTo(readPosition) <= 0);
boolean isReadPositionChanged = readPosition != null && !readPosition.equals(statsLastReadPosition);
statsLastReadPosition = readPosition;
return isReadPositionOnTail || isReadPositionChanged;
private boolean isCompactionCursor() {
return COMPACTION_CURSOR_NAME.equals(name);
public void setState(State state) {
this.state = state;
public void setCacheReadEntry(boolean cacheReadEntry) {
this.cacheReadEntry = cacheReadEntry;
public boolean isCacheReadEntry() {
return cacheReadEntry;
private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
public ManagedLedgerConfig getConfig() {
return config;