blob: aa0d04783d9917b4505569b078b4b4c3f4c23211 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Range;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.testng.annotations.Test;
public class ManagedCursorIndividualDeletedMessagesTest {
@Test(timeOut = 10000)
void testRecoverIndividualDeletedMessages() throws Exception {
BookKeeper bookkeeper = mock(BookKeeper.class);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setUnackedRangesOpenCacheSetEnabled(true);
NavigableMap<Long, LedgerInfo> ledgersInfo = new ConcurrentSkipListMap<>();
ledgersInfo.put(1L, createLedgerInfo(1, 100, 1024));
ledgersInfo.put(3L, createLedgerInfo(3, 50, 512));
ledgersInfo.put(5L, createLedgerInfo(5, 200, 2048));
ledgersInfo.put(10L, createLedgerInfo(10, 2, 32));
ledgersInfo.put(20L, createLedgerInfo(20, 10, 256));
ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class);
doReturn(ledgersInfo).when(ledger).getLedgersInfo();
ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor"));
LongPairRangeSet<PositionImpl> deletedMessages = cursor.getIndividuallyDeletedMessagesSet();
Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages",
List.class);
recoverMethod.setAccessible(true);
// (1) [(1:5..1:10]]
List<MessageRange> messageRangeList = new ArrayList();
messageRangeList.add(createMessageRange(1, 5, 1, 10));
List<Range<PositionImpl>> expectedRangeList = new ArrayList();
expectedRangeList.add(createPositionRange(1, 5, 1, 10));
recoverMethod.invoke(cursor, messageRangeList);
assertEquals(deletedMessages.size(), 1);
assertEquals(deletedMessages.asRanges(), expectedRangeList);
// (2) [(1:10..3:0]]
messageRangeList.clear();
messageRangeList.add(createMessageRange(1, 10, 3, 0));
expectedRangeList.clear();
expectedRangeList.add(createPositionRange(1, 10, 1, 99));
expectedRangeList.add(createPositionRange(3, -1, 3, 0));
recoverMethod.invoke(cursor, messageRangeList);
assertEquals(deletedMessages.size(), 2);
assertEquals(deletedMessages.asRanges(), expectedRangeList);
// (3) [(1:20..10:1],(20:2..20:9]]
messageRangeList.clear();
messageRangeList.add(createMessageRange(1, 20, 10, 1));
messageRangeList.add(createMessageRange(20, 2, 20, 9));
expectedRangeList.clear();
expectedRangeList.add(createPositionRange(1, 20, 1, 99));
expectedRangeList.add(createPositionRange(3, -1, 3, 49));
expectedRangeList.add(createPositionRange(5, -1, 5, 199));
expectedRangeList.add(createPositionRange(10, -1, 10, 1));
expectedRangeList.add(createPositionRange(20, 2, 20, 9));
recoverMethod.invoke(cursor, messageRangeList);
assertEquals(deletedMessages.size(), 5);
assertEquals(deletedMessages.asRanges(), expectedRangeList);
}
private static LedgerInfo createLedgerInfo(long ledgerId, long entries, long size) {
return LedgerInfo.newBuilder().setLedgerId(ledgerId).setEntries(entries).setSize(size)
.setTimestamp(System.currentTimeMillis()).build();
}
private static MessageRange createMessageRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
long upperEntryId) {
NestedPositionInfo.Builder nestedPositionBuilder = NestedPositionInfo.newBuilder();
MessageRange.Builder messageRangeBuilder = MessageRange.newBuilder();
nestedPositionBuilder.setLedgerId(lowerLedgerId);
nestedPositionBuilder.setEntryId(lowerEntryId);
messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
nestedPositionBuilder.setLedgerId(upperLedgerId);
nestedPositionBuilder.setEntryId(upperEntryId);
messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
return messageRangeBuilder.build();
}
private static Range<PositionImpl> createPositionRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
long upperEntryId) {
return Range.openClosed(new PositionImpl(lowerLedgerId, lowerEntryId),
new PositionImpl(upperLedgerId, upperEntryId));
}
}