blob: 04d99d3bdf480cff54f59dce648b8dd645022f91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.testng.annotations.Test;
public class ManagedCursorContainerTest {
private static class MockManagedCursor implements ManagedCursor {
ManagedCursorContainer container;
Position position;
String name;
public MockManagedCursor(ManagedCursorContainer container, String name, Position position) {
this.container = container;
this.name = name;
this.position = position;
}
@Override
public Map<String, Long> getProperties() {
return Collections.emptyMap();
}
@Override
public Map<String, String> getCursorProperties() {
return Collections.emptyMap();
}
@Override
public CompletableFuture<Void> putCursorProperty(String key, String value) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> removeCursorProperty(String key) {
return CompletableFuture.completedFuture(null);
}
@Override
public boolean putProperty(String key, Long value) {
return false;
}
@Override
public boolean removeProperty(String key) {
return false;
}
@Override
public boolean isDurable() {
return true;
}
@Override
public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerException {
return new ArrayList();
}
@Override
public void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
callback.readEntriesComplete(null, ctx);
}
@Override
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
callback.readEntriesComplete(null, ctx);
}
@Override
public boolean hasMoreEntries() {
return true;
}
@Override
public long getNumberOfEntries() {
return 0;
}
@Override
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
return 0;
}
@Override
public void markDelete(Position position) throws ManagedLedgerException {
markDelete(position, Collections.emptyMap());
}
@Override
public void markDelete(Position position, Map<String, Long> properties) throws ManagedLedgerException {
this.position = position;
container.cursorUpdated(this, (PositionImpl) position);
}
@Override
public void asyncMarkDelete(Position position, MarkDeleteCallback callback, Object ctx) {
fail();
}
@Override
public void asyncMarkDelete(Position position, Map<String, Long> properties, MarkDeleteCallback callback,
Object ctx) {
fail();
}
@Override
public Position getMarkDeletedPosition() {
return position;
}
@Override
public Position getPersistentMarkDeletedPosition() {
return position;
}
@Override
public String getName() {
return name;
}
@Override
public long getLastActive() {
return System.currentTimeMillis();
}
@Override
public void updateLastActive() {
// no-op
}
public String toString() {
return String.format("%s=%s", name, position);
}
@Override
public Position getReadPosition() {
return null;
}
@Override
public void rewind() {
}
@Override
public void seek(Position newReadPosition, boolean force) {
}
@Override
public void close() {
}
@Override
public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
}
@Override
public void delete(Position position) throws InterruptedException, ManagedLedgerException {
}
@Override
public void asyncDelete(Position position, DeleteCallback callback, Object ctx) {
}
@Override
public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
}
@Override
public void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx) {
}
@Override
public void clearBacklog() throws InterruptedException, ManagedLedgerException {
}
@Override
public void asyncClearBacklog(ClearBacklogCallback callback, Object ctx) {
}
@Override
public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries)
throws InterruptedException, ManagedLedgerException {
}
@Override
public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries,
final SkipEntriesCallback callback, Object ctx) {
}
@Override
public Position findNewestMatching(Predicate<Entry> condition)
throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
}
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
}
@Override
public void asyncResetCursor(final Position position, boolean forceReset,
AsyncCallbacks.ResetCursorCallback callback) {
}
@Override
public void resetCursor(final Position position) throws ManagedLedgerException, InterruptedException {
}
@Override
public Position getFirstPosition() {
return null;
}
@Override
public void setAlwaysInactive() {
}
@Override
public List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
return Sets.newConcurrentHashSet();
}
@Override
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
return Sets.newConcurrentHashSet();
}
@Override
public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
}
@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
}
@Override
public boolean cancelPendingReadRequest() {
return true;
}
@Override
public Entry getNthEntry(int N, IndividualDeletedEntries deletedEntries)
throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback,
Object ctx) {
}
@Override
public void setActive() {
}
@Override
public void setInactive() {
}
@Override
public boolean isActive() {
return true;
}
@Override
public long getNumberOfEntriesSinceFirstNotAckedMessage() {
return 0;
}
@Override
public int getTotalNonContiguousDeletedMessagesRange() {
return 0;
}
@Override
public int getNonContiguousDeletedMessagesRangeSerializedSize() {
return 0;
}
@Override
public long getEstimatedSizeSinceMarkDeletePosition() {
return 0L;
}
@Override
public void setThrottleMarkDelete(double throttleMarkDelete) {
}
@Override
public double getThrottleMarkDelete() {
return -1;
}
@Override
public ManagedLedger getManagedLedger() {
return null;
}
@Override
public Range<PositionImpl> getLastIndividualDeletedRange() {
return null;
}
@Override
public void trimDeletedEntries(List<Entry> entries) {
}
@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
return new long[0];
}
@Override
public ManagedCursorMXBean getStats() {
return null;
}
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx) {
}
@Override
public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
throws InterruptedException, ManagedLedgerException {
return null;
}
@Override
public boolean checkAndUpdateReadPositionChanged() {
return false;
}
@Override
public boolean isClosed() {
return false;
}
}
@Test
public void testSlowestReadPositionForActiveCursors() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
assertNull(container.getSlowestReaderPosition());
// Add no durable cursor
PositionImpl position = PositionImpl.get(5,5);
ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
doReturn(false).when(cursor1).isDurable();
doReturn(position).when(cursor1).getReadPosition();
container.add(cursor1, position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
// Add no durable cursor
position = PositionImpl.get(1,1);
ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
doReturn(false).when(cursor2).isDurable();
doReturn(position).when(cursor2).getReadPosition();
container.add(cursor2, position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
// Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
position = PositionImpl.get(5,6);
container.cursorUpdated(cursor2, position);
doReturn(position).when(cursor2).getReadPosition();
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
// Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
position = PositionImpl.get(5,8);
doReturn(position).when(cursor1).getReadPosition();
container.cursorUpdated(cursor1, position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
// Remove cursor, only cursor1 left, cursor1 = 5:8
container.removeCursor(cursor2.getName());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8));
}
@Test
public void simple() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
assertNull(container.getSlowestReaderPosition());
ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
container.add(cursor1, cursor1.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
container.add(cursor2, cursor2.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(2, 0));
container.add(cursor3, cursor3.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
assertEquals(container.toString(), "[test1=5:5, test2=2:2, test3=2:0]");
ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(4, 0));
container.add(cursor4, cursor4.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(3, 5));
container.add(cursor5, cursor5.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
cursor3.markDelete(new PositionImpl(3, 0));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
cursor2.markDelete(new PositionImpl(10, 5));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(3, 0));
container.removeCursor(cursor3.getName());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(3, 5));
container.removeCursor(cursor2.getName());
container.removeCursor(cursor5.getName());
container.removeCursor(cursor1.getName());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(4, 0));
assertTrue(container.hasDurableCursors());
container.removeCursor(cursor4.getName());
assertNull(container.getSlowestReaderPosition());
assertFalse(container.hasDurableCursors());
ManagedCursor cursor6 = new MockManagedCursor(container, "test6", new PositionImpl(6, 5));
container.add(cursor6, cursor6.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 5));
assertEquals(container.toString(), "[test6=6:5]");
}
@Test
public void updatingCursorOutsideContainer() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
container.add(cursor1, cursor1.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
container.add(cursor2, cursor2.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
cursor2.position = new PositionImpl(8, 8);
// Until we don't update the container, the ordering will not change
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
container.cursorUpdated(cursor2, cursor2.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
}
@Test
public void removingCursor() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
container.add(cursor1, cursor1.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
assertEquals(container.get("test1"), cursor1);
MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
container.add(cursor2, cursor2.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
assertEquals(container.get("test2"), cursor2);
MockManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(1, 1));
container.add(cursor3, cursor3.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
assertEquals(container.get("test3"), cursor3);
assertEquals(container, Lists.newArrayList(cursor1, cursor2, cursor3));
// Remove the cursor in the middle
container.removeCursor("test2");
assertEquals(container, Lists.newArrayList(cursor1, cursor3));
assertNull(container.get("test2"));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
container.removeCursor("test3");
assertEquals(container, Lists.newArrayList(cursor1));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
}
@Test
public void ordering() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(5, 1));
ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(7, 1));
ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
container.add(cursor1, cursor1.getMarkDeletedPosition());
container.add(cursor2, cursor2.getMarkDeletedPosition());
container.add(cursor3, cursor3.getMarkDeletedPosition());
container.add(cursor4, cursor4.getMarkDeletedPosition());
container.add(cursor5, cursor5.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
container.removeCursor("test2");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
container.removeCursor("test1");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 4));
container.removeCursor("test4");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 0));
container.removeCursor("test5");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 1));
container.removeCursor("test3");
assertFalse(container.hasDurableCursors());
}
@Test
public void orderingWithUpdates() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
MockManagedCursor c1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
MockManagedCursor c2 = new MockManagedCursor(container, "test2", new PositionImpl(5, 1));
MockManagedCursor c3 = new MockManagedCursor(container, "test3", new PositionImpl(7, 1));
MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
container.add(c1, c1.getMarkDeletedPosition());
container.add(c2, c2.getMarkDeletedPosition());
container.add(c3, c3.getMarkDeletedPosition());
container.add(c4, c4.getMarkDeletedPosition());
container.add(c5, c5.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
c1.position = new PositionImpl(5, 8);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
c2.position = new PositionImpl(5, 6);
container.cursorUpdated(c2, c2.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c1.position = new PositionImpl(6, 8);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c3.position = new PositionImpl(8, 5);
container.cursorUpdated(c3, c3.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c1.position = new PositionImpl(8, 4);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c2.position = new PositionImpl(8, 4);
container.cursorUpdated(c2, c2.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 4));
c4.position = new PositionImpl(7, 1);
container.cursorUpdated(c4, c4.position);
// ////
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 0));
container.removeCursor("test5");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 1));
container.removeCursor("test4");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(8, 4));
container.removeCursor("test1");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(8, 4));
container.removeCursor("test2");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(8, 5));
container.removeCursor("test3");
assertFalse(container.hasDurableCursors());
}
@Test
public void orderingWithUpdatesAndReset() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
MockManagedCursor c1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
MockManagedCursor c2 = new MockManagedCursor(container, "test2", new PositionImpl(5, 1));
MockManagedCursor c3 = new MockManagedCursor(container, "test3", new PositionImpl(7, 1));
MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
container.add(c1, c1.getMarkDeletedPosition());
container.add(c2, c2.getMarkDeletedPosition());
container.add(c3, c3.getMarkDeletedPosition());
container.add(c4, c4.getMarkDeletedPosition());
container.add(c5, c5.getMarkDeletedPosition());
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
c1.position = new PositionImpl(5, 8);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
c1.position = new PositionImpl(5, 6);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
c2.position = new PositionImpl(6, 8);
container.cursorUpdated(c2, c2.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c3.position = new PositionImpl(8, 5);
container.cursorUpdated(c3, c3.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
c1.position = new PositionImpl(8, 4);
container.cursorUpdated(c1, c1.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 4));
c2.position = new PositionImpl(4, 4);
container.cursorUpdated(c2, c2.position);
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(4, 4));
c4.position = new PositionImpl(7, 1);
container.cursorUpdated(c4, c4.position);
// ////
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(4, 4));
container.removeCursor("test2");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 0));
container.removeCursor("test5");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 1));
container.removeCursor("test1");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(7, 1));
container.removeCursor("test4");
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(8, 5));
container.removeCursor("test3");
assertFalse(container.hasDurableCursors());
}
}