blob: 128bef42f26159fc4fede4971f2d162afb21bd8e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP;
import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP_UR;
import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testing ledger write entry cases.
*/
public class BookieWriteLedgerTest extends
BookKeeperClusterTestCase implements AddCallback {
private static final Logger LOG = LoggerFactory
.getLogger(BookieWriteLedgerTest.class);
byte[] ledgerPassword = "aaa".getBytes();
LedgerHandle lh, lh2;
Enumeration<LedgerEntry> ls;
// test related variables
int numEntriesToWrite = 100;
int maxInt = Integer.MAX_VALUE;
Random rng; // Random Number Generator
ArrayList<byte[]> entries1; // generated entries
ArrayList<byte[]> entries2; // generated entries
private final DigestType digestType;
private static class SyncObj {
volatile int counter;
volatile int rc;
public SyncObj() {
counter = 0;
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
rng = new Random(0); // Initialize the Random
// Number Generator
entries1 = new ArrayList<byte[]>(); // initialize the entries list
entries2 = new ArrayList<byte[]>(); // initialize the entries list
}
public BookieWriteLedgerTest() {
super(5, 180);
this.digestType = DigestType.CRC32;
String ledgerManagerFactory = "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory";
// set ledger manager
baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
/*
* 'testLedgerCreateAdvWithLedgerIdInLoop2' testcase relies on skipListSizeLimit,
* so setting it to some small value for making that testcase lite.
*/
baseConf.setSkipListSizeLimit(4 * 1024 * 1024);
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
}
/**
* Verify write when few bookie failures in last ensemble and forcing
* ensemble reformation.
*/
@Test
public void testWithMultipleBookieFailuresInLastEnsemble() throws Exception {
// Create a ledger
lh = bkc.createLedger(5, 4, digestType, ledgerPassword);
LOG.info("Ledger ID: " + lh.getId());
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
// Start three more bookies
startNewBookie();
startNewBookie();
startNewBookie();
// Shutdown three bookies in the last ensemble and continue writing
List<BookieId> ensemble = lh.getLedgerMetadata()
.getAllEnsembles().entrySet().iterator().next().getValue();
killBookie(ensemble.get(0));
killBookie(ensemble.get(1));
killBookie(ensemble.get(2));
int i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
readEntries(lh, entries1);
lh.close();
}
/**
* Verify write and Read durability stats.
*/
@Test
public void testWriteAndReadStats() throws Exception {
// Create a ledger
lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
// write-batch-1
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
assertTrue(
"Stats should have captured a new writes",
bkc.getTestStatsProvider().getOpStatsLogger(
CLIENT_SCOPE + "." + ADD_OP)
.getSuccessCount() > 0);
CountDownLatch sleepLatch1 = new CountDownLatch(1);
CountDownLatch sleepLatch2 = new CountDownLatch(1);
List<BookieId> ensemble = lh.getLedgerMetadata()
.getAllEnsembles().entrySet().iterator().next().getValue();
sleepBookie(ensemble.get(0), sleepLatch1);
int i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
// write-batch-2
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
// Let the second bookie go to sleep. This forces write timeout and ensemble change
// Which will be enough time to receive delayed write failures on the write-batch-2
sleepBookie(ensemble.get(1), sleepLatch2);
i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
// write-batch-3
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
assertTrue(
"Stats should have captured a new UnderReplication during write",
bkc.getTestStatsProvider().getCounter(
CLIENT_SCOPE + "." + ADD_OP_UR)
.get() > 0);
sleepLatch1.countDown();
sleepLatch2.countDown();
// Replace the bookie with a fake bookie
ServerConfiguration conf = killBookie(ensemble.get(0));
BookieWriteLedgerTest.CorruptReadBookie corruptBookie = new BookieWriteLedgerTest.CorruptReadBookie(conf);
bs.add(startBookie(conf, corruptBookie));
bsConfs.add(conf);
i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
// write-batch-4
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
readEntries(lh, entries1);
assertTrue(
"Stats should have captured DigestMismatch on Read",
bkc.getTestStatsProvider().getCounter(
CLIENT_SCOPE + "." + READ_OP_DM)
.get() > 0);
lh.close();
}
/**
* Verty delayedWriteError causes ensemble changes.
*/
@Test
public void testDelayedWriteEnsembleChange() throws Exception {
// Create a ledger
lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
baseClientConf.setAddEntryTimeout(1);
int numEntriesToWrite = 10;
// write-batch-1
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
CountDownLatch sleepLatch1 = new CountDownLatch(1);
// get bookie at index-0
BookieId bookie1 = lh.getCurrentEnsemble().get(0);
sleepBookie(bookie1, sleepLatch1);
int i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 10;
// write-batch-2
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
// Sleep to receive delayed error on the write directed to the sleeping bookie
Thread.sleep(baseClientConf.getAddEntryTimeout() * 1000 * 2);
assertTrue(
"Stats should have captured a new UnderReplication during write",
bkc.getTestStatsProvider().getCounter(
CLIENT_SCOPE + "." + ADD_OP_UR)
.get() > 0);
i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 10;
// write-batch-3
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
sleepLatch1.countDown();
// get the bookie at index-0 again, this must be different.
BookieId bookie2 = lh.getCurrentEnsemble().get(0);
assertFalse(
"Delayed write error must have forced ensemble change",
bookie1.equals(bookie2));
lh.close();
}
/**
* Verify the functionality Ledgers with different digests.
*
* @throws Exception
*/
@Test
public void testLedgerDigestTest() throws Exception {
for (DigestType type: DigestType.values()) {
lh = bkc.createLedger(5, 3, 2, type, ledgerPassword);
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(entry.array());
}
readEntries(lh, entries1);
long lid = lh.getId();
lh.close();
bkc.deleteLedger(lid);
entries1.clear();
}
}
/**
* Verify the functionality of Advanced Ledger which returns
* LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
* user manage entryId allocation.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdv() throws Exception {
// Create a ledger
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(i, entry.array());
}
// Start one more bookies
startNewBookie();
// Shutdown one bookie in the last ensemble and continue writing
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next()
.getValue();
killBookie(ensemble.get(0));
int i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(i, entry.array());
}
readEntries(lh, entries1);
lh.close();
}
/**
* Verify that attempts to use addEntry() variant that does not require specifying entry id
* on LedgerHandleAdv results in error.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvAndWriteNonAdv() throws Exception {
long ledgerId = 0xABCDEF;
lh = bkc.createLedgerAdv(ledgerId, 3, 3, 2, digestType, ledgerPassword, null);
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
lh.addEntry(entry.array());
fail("expected IllegalOpException");
} catch (BKException.BKIllegalOpException e) {
// pass, expected
} finally {
lh.close();
bkc.deleteLedger(ledgerId);
}
}
/**
* Verify that LedgerHandleAdv cannnot handle addEntry without the entryId.
*
* @throws Exception
*/
@Test
public void testNoAddEntryLedgerCreateAdv() throws Exception {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
assertTrue(lh instanceof LedgerHandleAdv);
try {
lh.addEntry(entry.array());
fail("using LedgerHandleAdv addEntry without entryId is forbidden");
} catch (BKException e) {
assertEquals(e.getCode(), BKException.Code.IllegalOpException);
}
try {
lh.addEntry(entry.array(), 0, 4);
fail("using LedgerHandleAdv addEntry without entryId is forbidden");
} catch (BKException e) {
assertEquals(e.getCode(), BKException.Code.IllegalOpException);
}
try {
CompletableFuture<Object> done = new CompletableFuture<>();
lh.asyncAddEntry(Unpooled.wrappedBuffer(entry.array()),
(int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
SyncCallbackUtils.finish(rc, null, done);
}, null);
done.get();
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof BKException);
BKException e = (BKException) ee.getCause();
assertEquals(e.getCode(), BKException.Code.IllegalOpException);
}
try {
CompletableFuture<Object> done = new CompletableFuture<>();
lh.asyncAddEntry(entry.array(),
(int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
SyncCallbackUtils.finish(rc, null, done);
}, null);
done.get();
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof BKException);
BKException e = (BKException) ee.getCause();
assertEquals(e.getCode(), BKException.Code.IllegalOpException);
}
try {
CompletableFuture<Object> done = new CompletableFuture<>();
lh.asyncAddEntry(entry.array(), 0, 4,
(int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
SyncCallbackUtils.finish(rc, null, done);
}, null);
done.get();
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof BKException);
BKException e = (BKException) ee.getCause();
assertEquals(e.getCode(), BKException.Code.IllegalOpException);
}
lh.close();
}
/**
* Verify the functionality of Advanced Ledger which accepts ledgerId as input and returns
* LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
* user manage entryId allocation.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvWithLedgerId() throws Exception {
// Create a ledger
long ledgerId = 0xABCDEF;
lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(i, entry.array());
}
// Start one more bookies
startNewBookie();
// Shutdown one bookie in the last ensemble and continue writing
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next()
.getValue();
killBookie(ensemble.get(0));
int i = numEntriesToWrite;
numEntriesToWrite = numEntriesToWrite + 50;
for (; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(i, entry.array());
}
readEntries(lh, entries1);
lh.close();
bkc.deleteLedger(ledgerId);
}
/**
* Verify the functionality of Ledger create which accepts customMetadata as input.
* Also verifies that the data written is read back properly.
*
* @throws Exception
*/
@Test
public void testLedgerCreateWithCustomMetadata() throws Exception {
// Create a ledger
long ledgerId;
int maxLedgers = 10;
for (int i = 0; i < maxLedgers; i++) {
Map<String, byte[]> inputCustomMetadataMap = new HashMap<String, byte[]>();
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
// each ledger has different number of key, value pairs.
for (int j = 0; j < i; j++) {
inputCustomMetadataMap.put("key" + j, UUID.randomUUID().toString().getBytes());
}
if (i < maxLedgers / 2) {
// 0 to 4 test with createLedger interface
lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, inputCustomMetadataMap);
ledgerId = lh.getId();
lh.addEntry(entry.array());
} else {
// 5 to 9 test with createLedgerAdv interface
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword, inputCustomMetadataMap);
ledgerId = lh.getId();
lh.addEntry(0, entry.array());
}
lh.close();
// now reopen the ledger; this should fetch all the metadata stored on zk
// and the customMetadata written and read should match
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
Map<String, byte[]> outputCustomMetadataMap = lh.getCustomMetadata();
assertTrue("Can't retrieve proper Custom Data",
areByteArrayValMapsEqual(inputCustomMetadataMap, outputCustomMetadataMap));
lh.close();
bkc.deleteLedger(ledgerId);
}
}
/**
* Routine to compare two {@code Map<String, byte[]>}; Since the values in the map are {@code byte[]}, we can't use
* {@code Map.equals}.
* @param first
* The first map
* @param second
* The second map to compare with
* @return true if the 2 maps contain the exact set of {@code <K,V>} pairs.
*/
public static boolean areByteArrayValMapsEqual(Map<String, byte[]> first, Map<String, byte[]> second) {
if (first == null && second == null) {
return true;
}
// above check confirms that both are not null;
// if one is null the other isn't; so they must
// be different
if (first == null || second == null) {
return false;
}
if (first.size() != second.size()) {
return false;
}
for (Map.Entry<String, byte[]> entry : first.entrySet()) {
if (!Arrays.equals(entry.getValue(), second.get(entry.getKey()))) {
return false;
}
}
return true;
}
/*
* Verify the functionality of Advanced Ledger which accepts ledgerId as
* input and returns LedgerHandleAdv. LedgerHandleAdv takes entryId for
* addEntry, and let user manage entryId allocation.
* This testcase is mainly added for covering missing code coverage branches
* in LedgerHandleAdv
*
* @throws Exception
*/
@Test
public void testLedgerHandleAdvFunctionality() throws Exception {
// Create a ledger
long ledgerId = 0xABCDEF;
lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
numEntriesToWrite = 3;
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(0, entry.array());
// here asyncAddEntry(final long entryId, final byte[] data, final
// AddCallback cb, final Object ctx) method is
// called which is not covered in any other testcase
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
CountDownLatch latch = new CountDownLatch(1);
final int[] returnedRC = new int[1];
lh.asyncAddEntry(1, entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
CountDownLatch latch = (CountDownLatch) ctx;
returnedRC[0] = rc;
latch.countDown();
}
}, latch);
latch.await();
assertTrue("Returned code is expected to be OK", returnedRC[0] == BKException.Code.OK);
// here addEntry is called with incorrect offset and length
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
lh.addEntry(2, entry.array(), -3, 9);
fail("AddEntry is called with negative offset and incorrect length,"
+ "so it is expected to throw RuntimeException/IndexOutOfBoundsException");
} catch (RuntimeException exception) {
// expected RuntimeException/IndexOutOfBoundsException
}
// here addEntry is called with corrected offset and length and it is
// supposed to succeed
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(2, entry.array());
// LedgerHandle is closed for write
lh.close();
// here addEntry is called even after the close of the LedgerHandle, so
// it is expected to throw exception
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
try {
lh.addEntry(3, entry.array());
fail("AddEntry is called after the close of LedgerHandle,"
+ "so it is expected to throw BKLedgerClosedException");
} catch (BKLedgerClosedException exception) {
}
readEntries(lh, entries1);
bkc.deleteLedger(ledgerId);
}
/**
* In a loop create/write/delete the ledger with same ledgerId through
* the functionality of Advanced Ledger which accepts ledgerId as input.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
int ledgerCount = 40;
long maxId = 9999999999L;
if (baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
// since LongHierarchicalLedgerManager supports ledgerIds of decimal length upto 19 digits but other
// LedgerManagers only upto 10 decimals
maxId = Long.MAX_VALUE;
}
rng.longs(ledgerCount, 0, maxId) // generate a stream of ledger ids
.mapToObj(ledgerId -> { // create a ledger for each ledger id
LOG.info("Creating adv ledger with id {}", ledgerId);
return bkc.newCreateLedgerOp()
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
.execute()
.thenApply(writer -> { // Add entries to ledger when created
LOG.info("Writing stream of {} entries to {}",
numEntriesToWrite, ledgerId);
List<ByteBuf> entries = rng.ints(numEntriesToWrite, 0, maxInt)
.mapToObj(i -> {
ByteBuf entry = Unpooled.buffer(4);
entry.retain();
entry.writeInt(i);
return entry;
})
.collect(Collectors.toList());
CompletableFuture<?> lastRequest = null;
int i = 0;
for (ByteBuf entry : entries) {
long entryId = i++;
LOG.info("Writing {}:{} as {}",
ledgerId, entryId, entry.slice().readInt());
lastRequest = writer.writeAsync(entryId, entry);
}
lastRequest.join();
return Pair.of(writer, entries);
});
})
.parallel().map(CompletableFuture::join) // wait for all creations and adds in parallel
.forEach(e -> { // check that each set of adds succeeded
try {
WriteAdvHandle handle = e.getLeft();
List<ByteBuf> entries = e.getRight();
// Read and verify
LOG.info("Read entries for ledger: {}", handle.getId());
readEntries(handle, entries);
entries.forEach(ByteBuf::release);
handle.close();
bkc.deleteLedger(handle.getId());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Assert.fail("Test interrupted");
} catch (Exception ex) {
LOG.info("Readback failed with exception", ex);
Assert.fail("Readback failed " + ex.getMessage());
}
});
}
/**
* In a loop create/write/read/delete the ledger with ledgerId through the
* functionality of Advanced Ledger which accepts ledgerId as input.
* In this testcase (other testcases don't cover these conditions, hence new
* testcase is added), we create entries which are greater than
* SKIP_LIST_MAX_ALLOC_ENTRY size and tried to addEntries so that the total
* length of data written in this testcase is much greater than
* SKIP_LIST_SIZE_LIMIT, so that entries will be flushed from EntryMemTable
* to persistent storage
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvWithLedgerIdInLoop2() throws Exception {
assertTrue("Here we are expecting Bookies are configured to use SortedLedgerStorage",
baseConf.getSortedLedgerStorageEnabled());
long ledgerId;
int ledgerCount = 10;
List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
long skipListSizeLimit = baseConf.getSkipListSizeLimit();
int skipListArenaMaxAllocSize = baseConf.getSkipListArenaMaxAllocSize();
List<byte[]> tmpEntry;
for (int lc = 0; lc < ledgerCount; lc++) {
tmpEntry = new ArrayList<byte[]>();
ledgerId = rng.nextLong();
ledgerId &= Long.MAX_VALUE;
if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
// since LongHierarchicalLedgerManager supports ledgerIds of
// decimal length upto 19 digits but other
// LedgerManagers only upto 10 decimals
ledgerId %= 9999999999L;
}
LOG.debug("Iteration: {} LedgerId: {}", lc, ledgerId);
lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
lhArray[lc] = lh;
long ledgerLength = 0;
int i = 0;
while (ledgerLength < ((4 * skipListSizeLimit) / ledgerCount)) {
int length;
if (rng.nextBoolean()) {
length = Math.abs(rng.nextInt()) % (skipListArenaMaxAllocSize);
} else {
// here we want length to be random no. in the range of skipListArenaMaxAllocSize and
// 4*skipListArenaMaxAllocSize
length = (Math.abs(rng.nextInt()) % (skipListArenaMaxAllocSize * 3)) + skipListArenaMaxAllocSize;
}
byte[] data = new byte[length];
rng.nextBytes(data);
tmpEntry.add(data);
lh.addEntry(i, data);
ledgerLength += length;
i++;
}
entryList.add(tmpEntry);
}
for (int lc = 0; lc < ledgerCount; lc++) {
// Read and verify
long lid = lhArray[lc].getId();
LOG.debug("readEntries for lc: {} ledgerId: {} ", lc, lhArray[lc].getId());
readEntriesAndValidateDataArray(lhArray[lc], entryList.get(lc));
lhArray[lc].close();
bkc.deleteLedger(lid);
}
}
/**
* Verify asynchronous writing when few bookie failures in last ensemble.
*/
@Test
public void testAsyncWritesWithMultipleFailuresInLastEnsemble()
throws Exception {
// Create ledgers
lh = bkc.createLedger(5, 4, digestType, ledgerPassword);
lh2 = bkc.createLedger(5, 4, digestType, ledgerPassword);
LOG.info("Ledger ID-1: " + lh.getId());
LOG.info("Ledger ID-2: " + lh2.getId());
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
entries2.add(entry.array());
lh.addEntry(entry.array());
lh2.addEntry(entry.array());
}
// Start three more bookies
startNewBookie();
startNewBookie();
startNewBookie();
// Shutdown three bookies in the last ensemble and continue writing
List<BookieId> ensemble = lh.getLedgerMetadata()
.getAllEnsembles().entrySet().iterator().next().getValue();
killBookie(ensemble.get(0));
killBookie(ensemble.get(1));
killBookie(ensemble.get(2));
// adding one more entry to both the ledgers async after multiple bookie
// failures. This will do asynchronously modifying the ledger metadata
// simultaneously.
numEntriesToWrite++;
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
entries2.add(entry.array());
SyncObj syncObj1 = new SyncObj();
SyncObj syncObj2 = new SyncObj();
lh.asyncAddEntry(entry.array(), this, syncObj1);
lh2.asyncAddEntry(entry.array(), this, syncObj2);
// wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < 1) {
LOG.debug("Entries counter = " + syncObj1.counter);
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// wait for all entries to be acknowledged for the second ledger
synchronized (syncObj2) {
while (syncObj2.counter < 1) {
LOG.debug("Entries counter = " + syncObj2.counter);
syncObj2.wait();
}
assertEquals(BKException.Code.OK, syncObj2.rc);
}
// reading ledger till the last entry
readEntries(lh, entries1);
readEntries(lh2, entries2);
lh.close();
lh2.close();
}
/**
* Verify Advanced asynchronous writing with entryIds in reverse order.
*/
@Test
public void testLedgerCreateAdvWithAsyncWritesWithBookieFailures() throws Exception {
// Create ledgers
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
LOG.info("Ledger ID-1: " + lh.getId());
LOG.info("Ledger ID-2: " + lh2.getId());
SyncObj syncObj1 = new SyncObj();
SyncObj syncObj2 = new SyncObj();
for (int i = numEntriesToWrite - 1; i >= 0; i--) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
entries1.add(0, entry.array());
entries2.add(0, entry.array());
} catch (Exception e) {
e.printStackTrace();
}
lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
lh2.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2);
}
// Start One more bookie and shutdown one from last ensemble before reading
startNewBookie();
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next()
.getValue();
killBookie(ensemble.get(0));
// Wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < numEntriesToWrite) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Wait for all entries to be acknowledged for the second ledger
synchronized (syncObj2) {
while (syncObj2.counter < numEntriesToWrite) {
syncObj2.wait();
}
assertEquals(BKException.Code.OK, syncObj2.rc);
}
// Reading ledger till the last entry
readEntries(lh, entries1);
readEntries(lh2, entries2);
lh.close();
lh2.close();
}
/**
* LedgerHandleAdv out of order writers with ensemble changes.
* Verify that entry that was written to old ensemble will be
* written to new enseble too after ensemble change.
*
* @throws Exception
*/
@Test
public void testLedgerHandleAdvOutOfOrderWriteAndFrocedEnsembleChange() throws Exception {
// Create a ledger
long ledgerId = 0xABCDEF;
SyncObj syncObj1 = new SyncObj();
ByteBuffer entry;
lh = bkc.createLedgerAdv(ledgerId, 3, 3, 3, digestType, ledgerPassword, null);
entry = ByteBuffer.allocate(4);
// Add entries 0-4
for (int i = 0; i < 5; i++) {
entry.rewind();
entry.putInt(rng.nextInt(maxInt));
lh.addEntry(i, entry.array());
}
// Add 10 as Async Entry, which goes to first ensemble
ByteBuffer entry1 = ByteBuffer.allocate(4);
entry1.putInt(rng.nextInt(maxInt));
lh.asyncAddEntry(10, entry1.array(), 0, entry1.capacity(), this, syncObj1);
// Make sure entry-10 goes to the bookies and gets response.
java.util.Queue<PendingAddOp> myPendingAddOps = Whitebox.getInternalState(lh, "pendingAddOps");
PendingAddOp addOp = null;
boolean pendingAddOpReceived = false;
while (!pendingAddOpReceived) {
addOp = myPendingAddOps.peek();
if (addOp.entryId == 10 && addOp.completed) {
pendingAddOpReceived = true;
} else {
Thread.sleep(1000);
}
}
CountDownLatch sleepLatch1 = new CountDownLatch(1);
List<BookieId> ensemble;
ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
// Put all 3 bookies to sleep and start 3 new ones
sleepBookie(ensemble.get(0), sleepLatch1);
sleepBookie(ensemble.get(1), sleepLatch1);
sleepBookie(ensemble.get(2), sleepLatch1);
startNewBookie();
startNewBookie();
startNewBookie();
// Original bookies are in sleep, new bookies added.
// Now add entries 5-9 which forces ensemble changes
// So at this point entries 0-4, 10 went to first
// ensemble, 5-9 will go to new ensemble.
for (int i = 5; i < 10; i++) {
entry.rewind();
entry.putInt(rng.nextInt(maxInt));
lh.addEntry(i, entry.array());
}
// Wakeup all 3 bookies that went to sleep
sleepLatch1.countDown();
// Wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < 1) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Close write handle
lh.close();
// Open read handle
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
// Make sure to read all 10 entries.
for (int i = 0; i < 11; i++) {
lh.readEntries(i, i);
}
lh.close();
bkc.deleteLedger(ledgerId);
}
/**
* Verify Advanced asynchronous writing with entryIds in pseudo random order with bookie failures between writes.
*/
@Test
public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailuresBetweenWrites() throws Exception {
// Create ledgers
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
LOG.info("Ledger ID-1: " + lh.getId());
LOG.info("Ledger ID-2: " + lh2.getId());
SyncObj syncObj1 = new SyncObj();
SyncObj syncObj2 = new SyncObj();
int batchSize = 5;
int i, j;
// Fill the result buffers first
for (i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
entries1.add(0, entry.array());
entries2.add(0, entry.array());
} catch (Exception e) {
e.printStackTrace();
}
}
for (i = 0; i < batchSize; i++) {
for (j = i; j < numEntriesToWrite; j = j + batchSize) {
byte[] entry1 = entries1.get(j);
byte[] entry2 = entries2.get(j);
lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
if (j == numEntriesToWrite / 2) {
// Start One more bookie and shutdown one from last ensemble at half-way
startNewBookie();
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet()
.iterator().next().getValue();
killBookie(ensemble.get(0));
}
}
}
// Wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < numEntriesToWrite) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Wait for all entries to be acknowledged for the second ledger
synchronized (syncObj2) {
while (syncObj2.counter < numEntriesToWrite) {
syncObj2.wait();
}
assertEquals(BKException.Code.OK, syncObj2.rc);
}
// Reading ledger till the last entry
readEntries(lh, entries1);
readEntries(lh2, entries2);
lh.close();
lh2.close();
}
/**
* Verify Advanced asynchronous writing with entryIds in pseudo random order.
*/
@Test
public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailures() throws Exception {
// Create ledgers
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
LOG.info("Ledger ID-1: " + lh.getId());
LOG.info("Ledger ID-2: " + lh2.getId());
SyncObj syncObj1 = new SyncObj();
SyncObj syncObj2 = new SyncObj();
int batchSize = 5;
int i, j;
// Fill the result buffers first
for (i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
entries1.add(0, entry.array());
entries2.add(0, entry.array());
} catch (Exception e) {
e.printStackTrace();
}
}
for (i = 0; i < batchSize; i++) {
for (j = i; j < numEntriesToWrite; j = j + batchSize) {
byte[] entry1 = entries1.get(j);
byte[] entry2 = entries2.get(j);
lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
}
}
// Start One more bookie and shutdown one from last ensemble before reading
startNewBookie();
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next()
.getValue();
killBookie(ensemble.get(0));
// Wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < numEntriesToWrite) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Wait for all entries to be acknowledged for the second ledger
synchronized (syncObj2) {
while (syncObj2.counter < numEntriesToWrite) {
syncObj2.wait();
}
assertEquals(BKException.Code.OK, syncObj2.rc);
}
// Reading ledger till the last entry
readEntries(lh, entries1);
readEntries(lh2, entries2);
lh.close();
lh2.close();
}
/**
* Skips few entries before closing the ledger and assert that the
* lastAddConfirmed is right before our skipEntryId.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvWithSkipEntries() throws Exception {
long ledgerId;
SyncObj syncObj1 = new SyncObj();
// Create a ledger
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
// Save ledgerId to reopen the ledger
ledgerId = lh.getId();
LOG.info("Ledger ID: " + ledgerId);
int skipEntryId = rng.nextInt(numEntriesToWrite - 1);
for (int i = numEntriesToWrite - 1; i >= 0; i--) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
entries1.add(0, entry.array());
} catch (Exception e) {
e.printStackTrace();
}
if (i == skipEntryId) {
LOG.info("Skipping entry:{}", skipEntryId);
continue;
}
lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
}
// wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < skipEntryId) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Close the ledger
lh.close();
// Open the ledger
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
assertEquals(lh.lastAddConfirmed, skipEntryId - 1);
lh.close();
}
/**
* Verify the functionality LedgerHandleAdv addEntry with duplicate entryIds.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvSyncAddDuplicateEntryIds() throws Exception {
// Create a ledger
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
LOG.info("Ledger ID: " + lh.getId());
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
lh.addEntry(i, entry.array());
entry.position(0);
}
readEntries(lh, entries1);
int dupEntryId = rng.nextInt(numEntriesToWrite - 1);
try {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
lh.addEntry(dupEntryId, entry.array());
fail("Expected exception not thrown");
} catch (BKException e) {
// This test expects DuplicateEntryIdException
assertEquals(e.getCode(), BKException.Code.DuplicateEntryIdException);
}
lh.close();
}
/**
* Verify the functionality LedgerHandleAdv asyncAddEntry with duplicate
* entryIds.
*
* @throws Exception
*/
@Test
public void testLedgerCreateAdvSyncAsyncAddDuplicateEntryIds() throws Exception {
long ledgerId;
SyncObj syncObj1 = new SyncObj();
SyncObj syncObj2 = new SyncObj();
// Create a ledger
lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
// Save ledgerId to reopen the ledger
ledgerId = lh.getId();
LOG.info("Ledger ID: " + ledgerId);
for (int i = numEntriesToWrite - 1; i >= 0; i--) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
try {
entries1.add(0, entry.array());
} catch (Exception e) {
e.printStackTrace();
}
lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
if (rng.nextBoolean()) {
// Attempt to write the same entry
lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2);
synchronized (syncObj2) {
while (syncObj2.counter < 1) {
syncObj2.wait();
}
assertEquals(BKException.Code.DuplicateEntryIdException, syncObj2.rc);
}
}
}
// Wait for all entries to be acknowledged for the first ledger
synchronized (syncObj1) {
while (syncObj1.counter < numEntriesToWrite) {
syncObj1.wait();
}
assertEquals(BKException.Code.OK, syncObj1.rc);
}
// Close the ledger
lh.close();
}
@Test
@SuppressWarnings("unchecked")
public void testLedgerCreateAdvByteBufRefCnt() throws Exception {
long ledgerId = rng.nextLong();
ledgerId &= Long.MAX_VALUE;
if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
// since LongHierarchicalLedgerManager supports ledgerIds of
// decimal length upto 19 digits but other
// LedgerManagers only upto 10 decimals
ledgerId %= 9999999999L;
}
final LedgerHandle lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
new PooledByteBufAllocator(true),
new PooledByteBufAllocator(false),
new UnpooledByteBufAllocator(true),
new UnpooledByteBufAllocator(false));
long entryId = 0;
for (AbstractByteBufAllocator alloc: allocs) {
final ByteBuf data = alloc.buffer(10);
data.writeBytes(("fragment0" + entryId).getBytes());
assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
CompletableFuture<Integer> cf = new CompletableFuture<>();
lh.asyncAddEntry(entryId, data, (rc, handle, eId, qwcLatency, ctx) -> {
CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
future.complete(rc);
}, cf);
int rc = cf.get();
assertEquals("rc code is OK", BKException.Code.OK, rc);
for (int i = 0; i < 10; i++) {
if (data.refCnt() == 0) {
break;
}
TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
}
assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
0, data.refCnt());
org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
entryId++;
}
bkc.deleteLedger(lh.ledgerId);
}
@Test
@SuppressWarnings("unchecked")
public void testLedgerCreateByteBufRefCnt() throws Exception {
final LedgerHandle lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, null);
final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
new PooledByteBufAllocator(true),
new PooledByteBufAllocator(false),
new UnpooledByteBufAllocator(true),
new UnpooledByteBufAllocator(false));
int entryId = 0;
for (AbstractByteBufAllocator alloc: allocs) {
final ByteBuf data = alloc.buffer(10);
data.writeBytes(("fragment0" + entryId).getBytes());
assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
CompletableFuture<Integer> cf = new CompletableFuture<>();
lh.asyncAddEntry(data, (rc, handle, eId, ctx) -> {
CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
future.complete(rc);
}, cf);
int rc = cf.get();
assertEquals("rc code is OK", BKException.Code.OK, rc);
for (int i = 0; i < 10; i++) {
if (data.refCnt() == 0) {
break;
}
TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
}
assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
0, data.refCnt());
org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
entryId++;
}
bkc.deleteLedger(lh.ledgerId);
}
private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(index++));
Integer origEntry = origbb.getInt();
ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
LOG.debug("Length of result: " + result.capacity());
LOG.debug("Original entry: " + origEntry);
Integer retrEntry = result.getInt();
LOG.debug("Retrieved entry: " + retrEntry);
assertTrue("Checking entry " + index + " for equality", origEntry
.equals(retrEntry));
}
}
private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws Exception {
assertEquals("Not enough entries in ledger " + reader.getId(),
reader.getLastAddConfirmed(), entries.size() - 1);
try (LedgerEntries readEntries = reader.read(0, reader.getLastAddConfirmed())) {
int i = 0;
for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries) {
int entryId = i++;
ByteBuf origEntry = entries.get(entryId);
ByteBuf readEntry = e.getEntryBuffer();
assertEquals("Unexpected contents in " + reader.getId() + ":" + entryId, origEntry, readEntry);
}
}
}
private void readEntriesAndValidateDataArray(LedgerHandle lh, List<byte[]> entries)
throws InterruptedException, BKException {
ls = lh.readEntries(0, entries.size() - 1);
int index = 0;
while (ls.hasMoreElements()) {
byte[] originalData = entries.get(index++);
byte[] receivedData = ls.nextElement().getEntry();
LOG.debug("Length of originalData: {}", originalData.length);
LOG.debug("Length of receivedData: {}", receivedData.length);
assertEquals(
String.format("LedgerID: %d EntryID: %d OriginalDataLength: %d ReceivedDataLength: %d", lh.getId(),
(index - 1), originalData.length, receivedData.length),
originalData.length, receivedData.length);
Assert.assertArrayEquals(
String.format("Checking LedgerID: %d EntryID: %d for equality", lh.getId(), (index - 1)),
originalData, receivedData);
}
}
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
SyncObj x = (SyncObj) ctx;
synchronized (x) {
x.rc = rc;
x.counter++;
x.notify();
}
}
static class CorruptReadBookie extends BookieImpl {
static final Logger LOG = LoggerFactory.getLogger(CorruptReadBookie.class);
ByteBuf localBuf;
public CorruptReadBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
super(conf);
}
@Override
public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException {
localBuf = super.readEntry(ledgerId, entryId);
int capacity = 0;
while (capacity < localBuf.capacity()) {
localBuf.setByte(capacity, 0);
capacity++;
}
return localBuf;
}
}
}