blob: c8338798f271bd33ddd48311dd9a07865eb00a8f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.mledger.impl;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;
public class EntryCacheTest extends MockedBookKeeperTestCase {
private ManagedLedgerImpl ml;
protected void setUpTestCase() throws Exception {
ml = mock(ManagedLedgerImpl.class);
when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
when(ml.getConfig()).thenReturn(new ManagedLedgerConfig());
@Test(timeOut = 5000)
public void testRead() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
for (int i = 0; i < 10; i++) {
entryCache.insert(EntryImpl.create(0, i, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {"should not have failed");
}, null);
// Verify no entries were read from bookkeeper
verify(lh, never()).readAsync(anyLong(), anyLong());
@Test(timeOut = 5000)
public void testReadMissingBefore() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
for (int i = 3; i < 10; i++) {
entryCache.insert(EntryImpl.create(0, i, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {"should not have failed");
}, null);
@Test(timeOut = 5000)
public void testReadMissingAfter() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
for (int i = 0; i < 8; i++) {
entryCache.insert(EntryImpl.create(0, i, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {"should not have failed");
}, null);
@Test(timeOut = 5000)
public void testReadMissingMiddle() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(EntryImpl.create(0, 0, data));
entryCache.insert(EntryImpl.create(0, 1, data));
entryCache.insert(EntryImpl.create(0, 8, data));
entryCache.insert(EntryImpl.create(0, 9, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {"should not have failed");
}, null);
@Test(timeOut = 5000)
public void testReadMissingMultiple() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(EntryImpl.create(0, 0, data));
entryCache.insert(EntryImpl.create(0, 2, data));
entryCache.insert(EntryImpl.create(0, 5, data));
entryCache.insert(EntryImpl.create(0, 8, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {"should not have failed");
}, null);
public void testCachedReadReturnsDifferentByteBuffer() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
CompletableFuture<List<Entry>> cacheMissFutureEntries = new CompletableFuture<>();
entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}, null);
List<Entry> cacheMissEntries = cacheMissFutureEntries.get();
// Ensure first entry is 0 and
assertEquals(cacheMissEntries.size(), 2);
assertEquals(cacheMissEntries.get(0).getEntryId(), 0);
assertEquals(cacheMissEntries.get(0).getDataBuffer().readerIndex(), 0);
// Move the reader index to simulate consumption
CompletableFuture<List<Entry>> cacheHitFutureEntries = new CompletableFuture<>();
entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}, null);
List<Entry> cacheHitEntries = cacheHitFutureEntries.get();
assertEquals(cacheHitEntries.get(0).getEntryId(), 0);
assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0);
@Test(timeOut = 5000)
public void testReadWithError() throws Exception {
final ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);
doAnswer((invocation) -> {
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
future.completeExceptionally(new BKNoSuchLedgerExistsException());
return future;
}).when(lh).readAsync(anyLong(), anyLong());
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(EntryImpl.create(0, 2, data));
final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {"should not complete");
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}, null);
static ReadHandle getLedgerHandle() {
final ReadHandle lh = mock(ReadHandle.class);
doAnswer((invocation) -> {
Object[] args = invocation.getArguments();
long firstEntry = (Long) args[0];
long lastEntry = (Long) args[1];
List<LedgerEntry> entries = new ArrayList<>();
for (int i = 0; i <= (lastEntry - firstEntry); i++) {
entries.add(LedgerEntryImpl.create(0, i, 10, Unpooled.wrappedBuffer(new byte[10])));
LedgerEntries ledgerEntries = mock(LedgerEntries.class);
doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator();
return CompletableFuture.completedFuture(ledgerEntries);
}).when(lh).readAsync(anyLong(), anyLong());
return lh;