In case of KEY_SHARED subscription and out-of-order acknowledgments, the PositionInfo state can be persisted to preserve the state, with configurable maximum number of ranges to persist:
# Max number of "acknowledgment holes" that are going to be persistently stored. # When acknowledging out of order, a consumer will leave holes that are supposed # to be quickly filled by acking all the messages. The information of which # messages are acknowledged is persisted by compressing in "ranges" of messages # that were acknowledged. After the max number of ranges is reached, the information # will only be tracked in memory and messages will be redelivered in case of # crashes. managedLedgerMaxUnackedRangesToPersist=10000
The PositionInfo state is stored to the BookKeeper as a single entry, and it can grow large if the number of ranges is large. Currently, this means that BookKeeper can fail persisting too large PositionInfo state, e.g. over 1MB by default and the ManagedCursor recovery on topic reload might not succeed.
There is an abandoned PIP-81 for similar problem, this PIP takes over.
While keeping the number of ranges low to prevent such problems is a common sense solution, there are cases where the higher number of ranges is required. For example, in case of the JMS protocol handler, JMS consumers with filters may end up processing data out of order and/or at different speed, and the number of ranges can grow large.
Store the PositionInfo state in a BookKeeper ledger as multiple entries if the state grows too large to be stored as a single entry.
Transparent backwards compatibility if the PositionInfo state is small enough.
Backwards compatibility in case of the PositionInfo state is too large to be stored as a single entry.
Cursor state writes and reads are happening at the same cases as currently, without changes.
Write path:
See persistPositionToLedger()
in ManagedCursorImpl
for the implementation.
The footer is a JSON representation of
public static final class ChunkSequenceFooter { private int numParts; private int length; }
Read path:
startPos = footerPosition - chunkSequenceFooter.numParts
to footerPosition - 1
) and merge them.See recoverFromLedgerByEntryId()
in ManagedCursorImpl
for the implementation.
Proposed implementation: https://github.com/apache/pulsar/pull/22799
Nothing
None
No public-facing changes
None
Existing monitoring should be sufficient.
N/A
Not affected, just upgrade.
Not affected, just downgrade as long as the managedLedgerMaxUnackedRangesToPersist was in the range to fit it into a single entry in BK.
Not affected AFAIK.
ML discussion and voting threads: