[Broker] Fix NonDurableCursorImpl initialPosition by startCursorPosition greater than lastConfirmedEntry problem. (#10095)
## Motivation
In order to fix reader set ```startMessageId``` greater than ```lastConfirmedEntry``` probelm.
When the reader set ```startMessageId``` greater than ```lastConfirmedEntry```, generate ```NonDurableCursorImpl``` will carry a ```MarkdeletePosition``` greater than lastConfirmedEntry.
When use ```getNumberOfEntriesInBacklog``` will throw
![image](https://user-images.githubusercontent.com/39078850/113100735-5f4d8980-922e-11eb-8758-7d3cfb1435f3.png)
## implement
When ```startMessageId``` greater than ```lastConfirmedEntry``` use ```lastConfirmedEntry``` as ```markdeletePosition```
### Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 0386588..15b1f04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -40,7 +40,7 @@
// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
// both ledgerId and entryId to be Long.max()
- if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
+ if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
// Start from last entry
switch (initialPosition) {
case Latest:
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 8ce209b..0446d53 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -325,6 +325,13 @@
}
@Test(timeOut = 20000)
+ void markDeleteGreaterThanLastConfirmedEntry() throws Exception {
+ ManagedLedger ml1 = factory.open("my_test_ledger");
+ ManagedCursor mc1 = ml1.newNonDurableCursor(PositionImpl.get(Long.MAX_VALUE - 1, Long.MAX_VALUE - 1));
+ assertEquals(mc1.getMarkDeletedPosition(), ml1.getLastConfirmedEntry());
+ }
+
+ @Test(timeOut = 20000)
void testResetCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));