| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.mledger.impl; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| |
| import java.nio.charset.Charset; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import io.netty.buffer.ByteBuf; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.ManagedLedgerConfig; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactory; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.test.MockedBookKeeperTestCase; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.annotations.Test; |
| |
| public class NonDurableCursorTest extends MockedBookKeeperTestCase { |
| |
| private static final Charset Encoding = Charsets.UTF_8; |
| |
| @Test(timeOut = 20000) |
| void readFromEmptyLedger() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger"); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); |
| List<Entry> entries = c1.readEntries(10); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| |
| ledger.addEntry("test".getBytes(Encoding)); |
| entries = c1.readEntries(10); |
| assertEquals(entries.size(), 1); |
| entries.forEach(e -> e.release()); |
| |
| entries = c1.readEntries(10); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| |
| // Test string representation |
| assertEquals(c1.toString(), "NonDurableCursorImpl{ledger=my_test_ledger, ackPos=3:-1, readPos=3:1}"); |
| } |
| |
| @Test(timeOut = 20000) |
| void testOpenNonDurableCursorAtNonExistentMessageId() throws Exception { |
| ManagedLedger ledger = factory.open("non_durable_cursor_at_non_existent_msgid"); |
| ManagedLedgerImpl mlImpl = (ManagedLedgerImpl) ledger; |
| |
| PositionImpl position = mlImpl.getLastPosition(); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(new PositionImpl( |
| position.getLedgerId(), |
| position.getEntryId() - 1 |
| )); |
| |
| assertEquals(c1.getReadPosition(), new PositionImpl( |
| position.getLedgerId(), |
| 0 |
| )); |
| |
| c1.close(); |
| ledger.close(); |
| } |
| |
| @Test(timeOut = 20000) |
| void testZNodeBypassed() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger"); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); |
| assertFalse(Iterables.isEmpty(ledger.getCursors())); |
| |
| c1.close(); |
| ledger.close(); |
| |
| // Re-open |
| ManagedLedger ledger2 = factory.open("my_test_ledger"); |
| assertTrue(Iterables.isEmpty(ledger2.getCursors())); |
| } |
| |
| @Test(timeOut = 20000) |
| void readTwice() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", |
| new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| ledger.addEntry("entry-1".getBytes(Encoding)); |
| ledger.addEntry("entry-2".getBytes(Encoding)); |
| |
| List<Entry> entries = c1.readEntries(2); |
| assertEquals(entries.size(), 2); |
| entries.forEach(e -> e.release()); |
| |
| entries = c1.readEntries(2); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| |
| entries = c2.readEntries(2); |
| assertEquals(entries.size(), 2); |
| entries.forEach(e -> e.release()); |
| |
| entries = c2.readEntries(2); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| } |
| |
| @Test(timeOut = 20000) |
| void readWithCacheDisabled() throws Exception { |
| ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); |
| config.setMaxCacheSize(0); |
| factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), config); |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| ledger.addEntry("entry-1".getBytes(Encoding)); |
| ledger.addEntry("entry-2".getBytes(Encoding)); |
| |
| List<Entry> entries = c1.readEntries(2); |
| assertEquals(entries.size(), 2); |
| assertEquals(new String(entries.get(0).getData(), Encoding), "entry-1"); |
| assertEquals(new String(entries.get(1).getData(), Encoding), "entry-2"); |
| entries.forEach(e -> e.release()); |
| |
| entries = c1.readEntries(2); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| |
| entries = c2.readEntries(2); |
| assertEquals(entries.size(), 2); |
| entries.forEach(e -> e.release()); |
| |
| entries = c2.readEntries(2); |
| assertEquals(entries.size(), 0); |
| entries.forEach(e -> e.release()); |
| } |
| |
| @Test(timeOut = 20000) |
| void readFromClosedLedger() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| ledger.close(); |
| |
| try { |
| c1.readEntries(2); |
| fail("ledger is closed, should fail"); |
| } catch (ManagedLedgerException e) { |
| // ok |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| void testNumberOfEntries() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| assertEquals(c1.getNumberOfEntries(), 4); |
| assertTrue(c1.hasMoreEntries()); |
| |
| assertEquals(c2.getNumberOfEntries(), 3); |
| assertTrue(c2.hasMoreEntries()); |
| |
| assertEquals(c3.getNumberOfEntries(), 2); |
| assertTrue(c3.hasMoreEntries()); |
| |
| assertEquals(c4.getNumberOfEntries(), 1); |
| assertTrue(c4.hasMoreEntries()); |
| |
| assertEquals(c5.getNumberOfEntries(), 0); |
| assertFalse(c5.hasMoreEntries()); |
| |
| List<Entry> entries = c1.readEntries(2); |
| assertEquals(entries.size(), 2); |
| c1.markDelete(entries.get(1).getPosition()); |
| assertEquals(c1.getNumberOfEntries(), 2); |
| entries.forEach(e -> e.release()); |
| } |
| |
| @Test(timeOut = 20000) |
| void testNumberOfEntriesInBacklog() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest); |
| Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest); |
| Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest); |
| Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); |
| assertEquals(c2.getNumberOfEntriesInBacklog(false), 3); |
| assertEquals(c3.getNumberOfEntriesInBacklog(false), 2); |
| assertEquals(c4.getNumberOfEntriesInBacklog(false), 1); |
| assertEquals(c5.getNumberOfEntriesInBacklog(false), 0); |
| |
| List<Entry> entries = c1.readEntries(2); |
| assertEquals(entries.size(), 2); |
| entries.forEach(e -> e.release()); |
| |
| assertEquals(c1.getNumberOfEntries(), 2); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); |
| |
| c1.markDelete(p1); |
| assertEquals(c1.getNumberOfEntries(), 2); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); |
| |
| c1.delete(p3); |
| |
| assertEquals(c1.getNumberOfEntries(), 1); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); |
| |
| c1.markDelete(p4); |
| assertEquals(c1.getNumberOfEntries(), 0); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); |
| } |
| |
| @Test(timeOut = 20000) |
| void markDeleteWithErrors() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger"); |
| ManagedCursor cursor = ledger.openCursor("c1"); |
| ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| List<Entry> entries = cursor.readEntries(100); |
| |
| stopBookKeeper(); |
| assertEquals(entries.size(), 1); |
| |
| // Mark-delete should succeed if BK is down |
| cursor.markDelete(entries.get(0).getPosition()); |
| |
| entries.forEach(e -> e.release()); |
| } |
| |
| @Test(timeOut = 20000) |
| void markDeleteAcrossLedgers() throws Exception { |
| ManagedLedger ml1 = factory.open("my_test_ledger"); |
| ManagedCursor mc1 = ml1.openCursor("c1"); |
| |
| // open ledger id 3 for ml1 |
| // markDeletePosition for mc1 is 3:-1 |
| // readPosition is 3:0 |
| |
| ml1.close(); |
| mc1.close(); |
| |
| // force removal of this ledger from the cache |
| factory.close(ml1); |
| |
| ManagedLedger ml2 = factory.open("my_test_ledger"); |
| ManagedCursor mc2 = ml2.openCursor("c1"); |
| |
| // open ledger id 5 for ml2 |
| // this entry is written at 5:0 |
| Position pos = ml2.addEntry("dummy-entry-1".getBytes(Encoding)); |
| |
| List<Entry> entries = mc2.readEntries(1); |
| assertEquals(entries.size(), 1); |
| assertEquals(new String(entries.get(0).getData(), Encoding), "dummy-entry-1"); |
| entries.forEach(e -> e.release()); |
| |
| mc2.delete(pos); |
| |
| // verify if the markDeletePosition moves from 3:-1 to 5:0 |
| assertEquals(mc2.getMarkDeletedPosition(), pos); |
| assertEquals(mc2.getMarkDeletedPosition().getNext(), mc2.getReadPosition()); |
| } |
| |
| @Test(timeOut = 20000) |
| void markDeleteGreaterThanLastConfirmedEntry() throws Exception { |
| ManagedLedger ml1 = factory.open("my_test_ledger"); |
| ManagedCursor mc1 = ml1.newNonDurableCursor(PositionImpl.get(Long.MAX_VALUE - 1, Long.MAX_VALUE - 1)); |
| assertEquals(mc1.getMarkDeletedPosition(), ml1.getLastConfirmedEntry()); |
| } |
| |
| @Test(timeOut = 20000) |
| void testResetCursor() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", |
| new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); |
| ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| PositionImpl lastPosition = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| final AtomicBoolean moveStatus = new AtomicBoolean(false); |
| PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2); |
| try { |
| cursor.resetCursor(resetPosition); |
| moveStatus.set(true); |
| } catch (Exception e) { |
| log.warn("error in reset cursor", e.getCause()); |
| } |
| |
| assertTrue(moveStatus.get()); |
| assertEquals(resetPosition, cursor.getReadPosition()); |
| cursor.close(); |
| ledger.close(); |
| } |
| |
| @Test(timeOut = 20000) |
| void testasyncResetCursor() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", |
| new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); |
| ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest); |
| ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| PositionImpl lastPosition = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| final AtomicBoolean moveStatus = new AtomicBoolean(false); |
| CountDownLatch countDownLatch = new CountDownLatch(1); |
| PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2); |
| |
| cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() { |
| @Override |
| public void resetComplete(Object ctx) { |
| moveStatus.set(true); |
| countDownLatch.countDown(); |
| } |
| |
| @Override |
| public void resetFailed(ManagedLedgerException exception, Object ctx) { |
| moveStatus.set(false); |
| countDownLatch.countDown(); |
| } |
| }); |
| countDownLatch.await(); |
| assertTrue(moveStatus.get()); |
| assertEquals(resetPosition, cursor.getReadPosition()); |
| cursor.close(); |
| ledger.close(); |
| } |
| |
| @Test(timeOut = 20000) |
| void rewind() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); |
| Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| |
| log.debug("p1: {}", p1); |
| log.debug("p2: {}", p2); |
| log.debug("p3: {}", p3); |
| log.debug("p4: {}", p4); |
| |
| assertEquals(c1.getNumberOfEntries(), 4); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); |
| c1.markDelete(p1); |
| assertEquals(c1.getNumberOfEntries(), 3); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); |
| List<Entry> entries = c1.readEntries(10); |
| assertEquals(entries.size(), 3); |
| entries.forEach(e -> e.release()); |
| |
| assertEquals(c1.getNumberOfEntries(), 0); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); |
| c1.rewind(); |
| assertEquals(c1.getNumberOfEntries(), 3); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); |
| c1.markDelete(p2); |
| assertEquals(c1.getNumberOfEntries(), 2); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); |
| |
| entries = c1.readEntries(10); |
| assertEquals(entries.size(), 2); |
| entries.forEach(e -> e.release()); |
| |
| assertEquals(c1.getNumberOfEntries(), 0); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); |
| c1.rewind(); |
| assertEquals(c1.getNumberOfEntries(), 2); |
| c1.markDelete(p4); |
| assertEquals(c1.getNumberOfEntries(), 0); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); |
| c1.rewind(); |
| assertEquals(c1.getNumberOfEntries(), 0); |
| ledger.addEntry("dummy-entry-5".getBytes(Encoding)); |
| assertEquals(c1.getNumberOfEntries(), 1); |
| ledger.addEntry("dummy-entry-6".getBytes(Encoding)); |
| assertEquals(c1.getNumberOfEntries(), 2); |
| } |
| |
| @Test(timeOut = 20000) |
| void markDeleteSkippingMessage() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); |
| ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); |
| Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); |
| Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); |
| ledger.addEntry("dummy-entry-3".getBytes(Encoding)); |
| PositionImpl p4 = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding)); |
| |
| assertEquals(cursor.getNumberOfEntries(), 4); |
| |
| cursor.markDelete(p1); |
| assertTrue(cursor.hasMoreEntries()); |
| assertEquals(cursor.getNumberOfEntries(), 3); |
| |
| assertEquals(cursor.getReadPosition(), p2); |
| |
| List<Entry> entries = cursor.readEntries(1); |
| assertEquals(entries.size(), 1); |
| assertEquals(new String(entries.get(0).getData(), Encoding), "dummy-entry-2"); |
| entries.forEach(e -> e.release()); |
| |
| cursor.markDelete(p4); |
| assertFalse(cursor.hasMoreEntries()); |
| assertEquals(cursor.getNumberOfEntries(), 0); |
| |
| assertEquals(cursor.getReadPosition(), new PositionImpl(p4.getLedgerId(), p4.getEntryId() + 1)); |
| } |
| |
| @Test(timeOut = 20000) |
| public void asyncMarkDeleteBlocking() throws Exception { |
| ManagedLedgerConfig config = new ManagedLedgerConfig(); |
| config.setMaxEntriesPerLedger(10); |
| config.setMetadataMaxEntriesPerLedger(5); |
| ManagedLedger ledger = factory.open("my_test_ledger", config); |
| final ManagedCursor c1 = ledger.openCursor("c1"); |
| final AtomicReference<Position> lastPosition = new AtomicReference<Position>(); |
| |
| final int N = 100; |
| final CountDownLatch latch = new CountDownLatch(N); |
| for (int i = 0; i < N; i++) { |
| ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { |
| @Override |
| public void addFailed(ManagedLedgerException exception, Object ctx) { |
| } |
| |
| @Override |
| public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| lastPosition.set(position); |
| c1.asyncMarkDelete(position, new MarkDeleteCallback() { |
| @Override |
| public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { |
| } |
| |
| @Override |
| public void markDeleteComplete(Object ctx) { |
| latch.countDown(); |
| } |
| }, null); |
| } |
| }, null); |
| } |
| |
| latch.await(); |
| |
| assertEquals(c1.getNumberOfEntries(), 0); |
| |
| // Reopen |
| ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); |
| ledger = factory2.open("my_test_ledger"); |
| ManagedCursor c2 = ledger.openCursor("c1"); |
| |
| assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); |
| factory2.shutdown(); |
| } |
| |
| @Test(timeOut = 20000) |
| void unorderedMarkDelete() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger"); |
| final ManagedCursor c1 = ledger.openCursor("c1"); |
| |
| Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); |
| Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); |
| |
| c1.markDelete(p2); |
| try { |
| c1.markDelete(p1); |
| fail("Should have thrown exception"); |
| } catch (ManagedLedgerException e) { |
| // ok |
| } |
| |
| assertEquals(c1.getMarkDeletedPosition(), p2); |
| } |
| |
| @Test(timeOut = 20000) |
| void testSingleDelete() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest); |
| |
| Position p1 = ledger.addEntry("entry1".getBytes()); |
| Position p2 = ledger.addEntry("entry2".getBytes()); |
| Position p3 = ledger.addEntry("entry3".getBytes()); |
| Position p4 = ledger.addEntry("entry4".getBytes()); |
| Position p5 = ledger.addEntry("entry5".getBytes()); |
| Position p6 = ledger.addEntry("entry6".getBytes()); |
| |
| Position p0 = cursor.getMarkDeletedPosition(); |
| |
| cursor.delete(p4); |
| assertEquals(cursor.getMarkDeletedPosition(), p0); |
| |
| cursor.delete(p1); |
| assertEquals(cursor.getMarkDeletedPosition(), p1); |
| |
| cursor.delete(p3); |
| |
| // Delete will silently succeed |
| cursor.delete(p3); |
| assertEquals(cursor.getMarkDeletedPosition(), p1); |
| |
| cursor.delete(p2); |
| assertEquals(cursor.getMarkDeletedPosition(), p4); |
| |
| cursor.delete(p5); |
| assertEquals(cursor.getMarkDeletedPosition(), p5); |
| |
| cursor.close(); |
| try { |
| cursor.delete(p6); |
| } catch (ManagedLedgerException e) { |
| // Ok |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| void subscribeToEarliestPositionWithImmediateDeletion() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); |
| |
| /* Position p1 = */ ledger.addEntry("entry-1".getBytes()); |
| /* Position p2 = */ ledger.addEntry("entry-2".getBytes()); |
| Position p3 = ledger.addEntry("entry-3".getBytes()); |
| |
| Thread.sleep(300); |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); |
| assertEquals(c1.getReadPosition(), p3); |
| assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1)); |
| } |
| |
| @Test // (timeOut = 20000) |
| void subscribeToEarliestPositionWithDeferredDeletion() throws Exception { |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1) |
| .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1)); |
| |
| Position p1 = ledger.addEntry("entry-1".getBytes()); |
| Position p2 = ledger.addEntry("entry-2".getBytes()); |
| /* Position p3 = */ ledger.addEntry("entry-3".getBytes()); |
| /* Position p4 = */ ledger.addEntry("entry-4".getBytes()); |
| /* Position p5 = */ ledger.addEntry("entry-5".getBytes()); |
| /* Position p6 = */ ledger.addEntry("entry-6".getBytes()); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); |
| assertEquals(c1.getReadPosition(), p1); |
| assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(3, -1)); |
| assertEquals(c1.getNumberOfEntries(), 6); |
| assertEquals(c1.getNumberOfEntriesInBacklog(false), 6); |
| |
| ManagedCursor c2 = ledger.newNonDurableCursor(p1); |
| assertEquals(c2.getReadPosition(), p2); |
| assertEquals(c2.getMarkDeletedPosition(), p1); |
| assertEquals(c2.getNumberOfEntries(), 5); |
| assertEquals(c2.getNumberOfEntriesInBacklog(false), 5); |
| } |
| |
| @Test |
| void testCursorWithNameIsCachable() throws Exception { |
| final String p1CursorName = "entry-1"; |
| final String p2CursorName = "entry-2"; |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); |
| |
| Position p1 = ledger.addEntry(p1CursorName.getBytes()); |
| Position p2 = ledger.addEntry(p2CursorName.getBytes()); |
| |
| ManagedCursor c1 = ledger.newNonDurableCursor(p1, p1CursorName); |
| ManagedCursor c2 = ledger.newNonDurableCursor(p1, p1CursorName); |
| ManagedCursor c3 = ledger.newNonDurableCursor(p2, p2CursorName); |
| ManagedCursor c4 = ledger.newNonDurableCursor(p2, p2CursorName); |
| |
| assertEquals(c1, c2); |
| assertEquals(c3, c4); |
| |
| assertNotEquals(c1, c3); |
| assertNotEquals(c2, c3); |
| assertNotEquals(c1, c4); |
| assertNotEquals(c2, c4); |
| |
| assertNotNull(c1.getName()); |
| assertNotNull(c2.getName()); |
| assertNotNull(c3.getName()); |
| assertNotNull(c4.getName()); |
| ledger.close(); |
| } |
| |
| @Test |
| public void testGetSlowestConsumer() throws Exception { |
| final String mlName = "test-get-slowest-consumer-ml"; |
| final String c1 = "cursor1"; |
| final String nc1 = "non-durable-cursor1"; |
| final String ncEarliest = "non-durable-cursor-earliest"; |
| |
| ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig()); |
| Position p1 = ledger.addEntry(c1.getBytes(UTF_8)); |
| log.info("write entry 1 : pos = {}", p1); |
| Position p2 = ledger.addEntry(nc1.getBytes(UTF_8)); |
| log.info("write entry 2 : pos = {}", p2); |
| Position p3 = ledger.addEntry(nc1.getBytes(UTF_8)); |
| log.info("write entry 3 : pos = {}", p3); |
| |
| ManagedCursor cursor1 = ledger.openCursor(c1); |
| cursor1.seek(p3); |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1); |
| // The slowest reader should still be the durable cursor since non-durable readers are not taken into account |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| PositionImpl earliestPos = new PositionImpl(-1, -2); |
| |
| ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest); |
| |
| // The slowest reader should still be the durable cursor since non-durable readers are not taken into account |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| // move non-durable cursor should NOT update the slowest reader position |
| nonCursorEarliest.markDelete(p1); |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| nonCursorEarliest.markDelete(p2); |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| nonCursorEarliest.markDelete(p3); |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| nonCursor1.markDelete(p3); |
| assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); |
| |
| ledger.close(); |
| } |
| |
| @Test |
| public void testBacklogStatsWhenDroppingData() throws Exception { |
| ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData", |
| new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); |
| ManagedCursor c1 = ledger.openCursor("c1"); |
| ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest); |
| |
| assertEquals(nonDurableCursor.getNumberOfEntries(), 0); |
| assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0); |
| |
| List<Position> positions = Lists.newArrayList(); |
| for (int i = 0; i < 10; i++) { |
| positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8))); |
| } |
| |
| assertEquals(nonDurableCursor.getNumberOfEntries(), 10); |
| assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10); |
| |
| c1.markDelete(positions.get(4)); |
| assertEquals(c1.getNumberOfEntries(), 5); |
| assertEquals(c1.getNumberOfEntriesInBacklog(true), 5); |
| |
| // Since the durable cursor has moved, the data will be trimmed |
| CompletableFuture<Void> promise = new CompletableFuture<>(); |
| ledger.internalTrimConsumedLedgers(promise); |
| promise.join(); |
| |
| assertEquals(nonDurableCursor.getNumberOfEntries(), 6); |
| assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6); |
| |
| c1.close(); |
| ledger.deleteCursor(c1.getName()); |
| promise = new CompletableFuture<>(); |
| ledger.internalTrimConsumedLedgers(promise); |
| promise.join(); |
| |
| assertEquals(nonDurableCursor.getNumberOfEntries(), 1); |
| assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1); |
| |
| ledger.close(); |
| } |
| |
| @Test(expectedExceptions = NullPointerException.class) |
| void testCursorWithNameIsNotNull() throws Exception { |
| final String p1CursorName = "entry-1"; |
| ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); |
| |
| Position p1 = ledger.addEntry(p1CursorName.getBytes()); |
| |
| try { |
| ledger.newNonDurableCursor(p1, null); |
| } catch (NullPointerException npe) { |
| assertEquals(npe.getMessage(), "cursor name can't be null"); |
| throw npe; |
| } finally { |
| ledger.close(); |
| } |
| } |
| |
| @Test |
| void deleteNonDurableCursorWithName() throws Exception { |
| ManagedLedger ledger = factory.open("deleteManagedLedgerWithNonDurableCursor"); |
| |
| ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.earliest, "custom-name"); |
| assertEquals(Iterables.size(ledger.getCursors()), 1); |
| |
| ledger.deleteCursor(c.getName()); |
| assertEquals(Iterables.size(ledger.getCursors()), 0); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(NonDurableCursorTest.class); |
| } |