blob: ef4d26b32ce582274f5d0e6248cb07c0b9eb3a7c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test parallel ledger recovery.
*/
public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(ParallelLedgerRecoveryTest.class);
static class TestLedgerManager implements LedgerManager {
final LedgerManager lm;
volatile CountDownLatch waitLatch = null;
final ExecutorService executorService;
TestLedgerManager(LedgerManager lm) {
this.lm = lm;
this.executorService = Executors.newSingleThreadExecutor();
}
void setLatch(CountDownLatch waitLatch) {
this.waitLatch = waitLatch;
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(
long ledgerId, LedgerMetadata metadata) {
return lm.createLedgerMetadata(ledgerId, metadata);
}
@Override
public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) {
return lm.removeLedgerMetadata(ledgerId, version);
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
return lm.readLedgerMetadata(ledgerId);
}
@Override
public LedgerRangeIterator getLedgerRanges(long zkOpTimeoutMs) {
return lm.getLedgerRanges(zkOpTimeoutMs);
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
Version currentVersion) {
final CountDownLatch cdl = waitLatch;
if (null != cdl) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
cdl.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted on waiting latch : ", e);
}
lm.writeLedgerMetadata(ledgerId, metadata, currentVersion)
.whenComplete((metadata, exception) -> {
if (exception != null) {
promise.completeExceptionally(exception);
} else {
promise.complete(metadata);
}
});
}
});
return promise;
} else {
return lm.writeLedgerMetadata(ledgerId, metadata, currentVersion);
}
}
@Override
public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
lm.registerLedgerMetadataListener(ledgerId, listener);
}
@Override
public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
lm.unregisterLedgerMetadataListener(ledgerId, listener);
}
@Override
public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context,
int successRc, int failureRc) {
lm.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
}
@Override
public void close() throws IOException {
lm.close();
executorService.shutdown();
}
}
static class TestLedgerManagerFactory extends HierarchicalLedgerManagerFactory {
@Override
public LedgerManager newLedgerManager() {
return new TestLedgerManager(super.newLedgerManager());
}
}
static class TestMetadataClientDriver extends ZKMetadataClientDriver {
@Override
public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
if (null == lmFactory) {
try {
lmFactory = new TestLedgerManagerFactory()
.initialize(conf, layoutManager, TestLedgerManagerFactory.CUR_VERSION);
} catch (IOException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}
return lmFactory;
}
}
static class TestMetadataBookieDriver extends ZKMetadataBookieDriver {
@Override
public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
if (null == lmFactory) {
try {
lmFactory = new TestLedgerManagerFactory()
.initialize(conf, layoutManager, TestLedgerManagerFactory.CUR_VERSION);
} catch (IOException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}
return lmFactory;
}
}
final DigestType digestType;
public ParallelLedgerRecoveryTest() throws Exception {
super(3);
this.digestType = DigestType.CRC32;
}
@Override
protected void startBKCluster(String metadataServiceUri) throws Exception {
MetadataDrivers.registerClientDriver("zk", TestMetadataClientDriver.class, true);
MetadataDrivers.registerBookieDriver("zk", TestMetadataBookieDriver.class, true);
baseConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
baseClientConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
baseClientConf.setReadEntryTimeout(60000);
baseClientConf.setAddEntryTimeout(60000);
super.startBKCluster(metadataServiceUri);
}
@After
@Override
public void tearDown() throws Exception {
try {
super.tearDown();
} finally {
MetadataDrivers.registerClientDriver("zk", ZKMetadataClientDriver.class, true);
MetadataDrivers.registerBookieDriver("zk", ZKMetadataBookieDriver.class, true);
}
}
@Test
public void testRecoverBeforeWriteMetadata1() throws Exception {
rereadDuringRecovery(true, 1, false, false);
}
@Test
public void testRecoverBeforeWriteMetadata2() throws Exception {
rereadDuringRecovery(true, 3, false, false);
}
@Test
public void testRecoverBeforeWriteMetadata3() throws Exception {
rereadDuringRecovery(false, 1, false, false);
}
@Test
public void testRecoverBeforeWriteMetadata4() throws Exception {
rereadDuringRecovery(false, 3, false, false);
}
@Test
public void testRereadDuringRecovery1() throws Exception {
rereadDuringRecovery(true, 1, true, false);
}
@Test
public void testRereadDuringRecovery2() throws Exception {
rereadDuringRecovery(true, 3, true, false);
}
@Test
public void testRereadDuringRecovery3() throws Exception {
rereadDuringRecovery(false, 1, true, false);
}
@Test
public void testRereadDuringRecovery4() throws Exception {
rereadDuringRecovery(false, 3, true, false);
}
@Test
public void testConcurrentRecovery1() throws Exception {
rereadDuringRecovery(true, 1, true, false);
}
@Test
public void testConcurrentRecovery2() throws Exception {
rereadDuringRecovery(true, 3, true, false);
}
@Test
public void testConcurrentRecovery3() throws Exception {
rereadDuringRecovery(false, 1, true, false);
}
@Test
public void testConcurrentRecovery4() throws Exception {
rereadDuringRecovery(false, 3, true, false);
}
private void rereadDuringRecovery(boolean parallelRead, int batchSize,
boolean updateMetadata, boolean close) throws Exception {
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
newConf.setEnableParallelRecoveryRead(parallelRead);
newConf.setRecoveryReadBatchSize(batchSize);
BookKeeper newBk = new BookKeeper(newConf);
TestLedgerManager tlm = (TestLedgerManager) newBk.getUnderlyingLedgerManager();
final LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
int numEntries = (numBookies * 3) + 1;
final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
final CountDownLatch addDone = new CountDownLatch(1);
for (int i = 0; i < numEntries; i++) {
lh.asyncAddEntry(("" + i).getBytes(), new org.apache.bookkeeper.client.AsyncCallback.AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (BKException.Code.OK != rc) {
addDone.countDown();
return;
}
if (numPendingAdds.decrementAndGet() == 0) {
addDone.countDown();
}
}
}, null);
}
latch1.countDown();
latch2.countDown();
addDone.await(10, TimeUnit.SECONDS);
assertEquals(0, numPendingAdds.get());
LOG.info("Added {} entries to ledger {}.", numEntries, lh.getId());
long ledgerLenth = lh.getLength();
LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
assertEquals(BookieProtocol.INVALID_ENTRY_ID, recoverLh.getLastAddPushed());
assertEquals(BookieProtocol.INVALID_ENTRY_ID, recoverLh.getLastAddConfirmed());
assertEquals(0, recoverLh.getLength());
LOG.info("OpenLedgerNoRecovery {}.", lh.getId());
final CountDownLatch metadataLatch = new CountDownLatch(1);
tlm.setLatch(metadataLatch);
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean(false);
((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
LOG.info("Recovering ledger {} completed : {}.", lh.getId(), rc);
success.set(BKException.Code.OK == rc);
recoverLatch.countDown();
}
});
// clear the metadata latch
tlm.setLatch(null);
if (updateMetadata) {
if (close) {
LOG.info("OpenLedger {} to close.", lh.getId());
LedgerHandle newRecoverLh = newBk.openLedger(lh.getId(), digestType, "".getBytes());
newRecoverLh.close();
} else {
LOG.info("OpenLedgerNoRecovery {} again.", lh.getId());
LedgerHandle newRecoverLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
assertEquals(BookieProtocol.INVALID_ENTRY_ID, newRecoverLh.getLastAddPushed());
assertEquals(BookieProtocol.INVALID_ENTRY_ID, newRecoverLh.getLastAddConfirmed());
// mark the ledger as in recovery to update version.
ClientUtil.transformMetadata(newBk.getClientCtx(), newRecoverLh.getId(),
(metadata) -> LedgerMetadataBuilder.from(metadata).withInRecoveryState().build());
newRecoverLh.close();
LOG.info("Updated ledger manager {}.", newRecoverLh.getLedgerMetadata());
}
}
// resume metadata operation on recoverLh
metadataLatch.countDown();
LOG.info("Resume metadata update.");
// wait until recover completed
recoverLatch.await(20, TimeUnit.SECONDS);
assertTrue(success.get());
assertEquals(numEntries - 1, recoverLh.getLastAddPushed());
assertEquals(numEntries - 1, recoverLh.getLastAddConfirmed());
assertEquals(ledgerLenth, recoverLh.getLength());
assertTrue(recoverLh.getLedgerMetadata().isClosed());
Enumeration<LedgerEntry> enumeration = recoverLh.readEntries(0, numEntries - 1);
int numReads = 0;
while (enumeration.hasMoreElements()) {
LedgerEntry entry = enumeration.nextElement();
assertEquals((long) numReads, entry.getEntryId());
assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
++numReads;
}
assertEquals(numEntries, numReads);
recoverLh.close();
newBk.close();
}
@Test
public void testRecoveryOnEntryGap() throws Exception {
byte[] passwd = "recovery-on-entry-gap".getBytes(UTF_8);
LedgerHandle lh = bkc.createLedger(1, 1, 1, DigestType.CRC32, passwd);
for (int i = 0; i < 10; i++) {
lh.addEntry(("recovery-on-entry-gap-" + i).getBytes(UTF_8));
}
// simulate ledger writer failure on concurrent writes causing gaps
long entryId = 14;
long lac = 8;
byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
ByteBufList toSend =
lh.macManager.computeDigestAndPackageForSending(
entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length));
final CountDownLatch addLatch = new CountDownLatch(1);
final AtomicBoolean addSuccess = new AtomicBoolean(false);
LOG.info("Add entry {} with lac = {}", entryId, lac);
bkc.getBookieClient().addEntry(lh.getCurrentEnsemble().get(0),
lh.getId(), lh.ledgerKey, entryId, toSend,
new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
addSuccess.set(BKException.Code.OK == rc);
addLatch.countDown();
}
}, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
addLatch.await();
assertTrue("add entry 14 should succeed", addSuccess.get());
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
newConf.setEnableParallelRecoveryRead(true);
newConf.setRecoveryReadBatchSize(10);
BookKeeper newBk = new BookKeeper(newConf);
final LedgerHandle recoverLh =
newBk.openLedgerNoRecovery(lh.getId(), DigestType.CRC32, passwd);
assertEquals("wrong lac found", 8L, recoverLh.getLastAddConfirmed());
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicLong newLac = new AtomicLong(-1);
final AtomicBoolean isMetadataClosed = new AtomicBoolean(false);
final AtomicInteger numSuccessCalls = new AtomicInteger(0);
final AtomicInteger numFailureCalls = new AtomicInteger(0);
((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
if (BKException.Code.OK == rc) {
newLac.set(recoverLh.getLastAddConfirmed());
isMetadataClosed.set(recoverLh.getLedgerMetadata().isClosed());
numSuccessCalls.incrementAndGet();
} else {
numFailureCalls.incrementAndGet();
}
recoverLatch.countDown();
}
});
recoverLatch.await();
assertEquals("wrong lac found", 9L, newLac.get());
assertTrue("metadata isn't closed after recovery", isMetadataClosed.get());
Thread.sleep(5000);
assertEquals("recovery callback should be triggered only once", 1, numSuccessCalls.get());
assertEquals("recovery callback should be triggered only once", 0, numFailureCalls.get());
}
static class DelayResponseBookie extends BookieImpl {
static final class WriteCallbackEntry {
private final WriteCallback cb;
private final int rc;
private final long ledgerId;
private final long entryId;
private final BookieId addr;
private final Object ctx;
WriteCallbackEntry(WriteCallback cb,
int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
this.cb = cb;
this.rc = rc;
this.ledgerId = ledgerId;
this.entryId = entryId;
this.addr = addr;
this.ctx = ctx;
}
public void callback() {
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}
}
private final AtomicBoolean delayAddResponse = new AtomicBoolean(false);
private final AtomicBoolean delayReadResponse = new AtomicBoolean(false);
private final AtomicLong delayReadOnEntry = new AtomicLong(-1234L);
private volatile CountDownLatch delayReadLatch = null;
private final LinkedBlockingQueue<WriteCallbackEntry> delayQueue =
new LinkedBlockingQueue<WriteCallbackEntry>();
public DelayResponseBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
super(conf);
}
@Override
public void addEntry(ByteBuf entry, boolean ackBeforeSync, final WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException {
super.addEntry(entry, ackBeforeSync, new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
if (delayAddResponse.get()) {
delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, addr, ctx));
} else {
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}
}
}, ctx, masterKey);
}
@Override
public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException {
LOG.info("ReadEntry {} - {}", ledgerId, entryId);
if (delayReadResponse.get() && delayReadOnEntry.get() == entryId) {
CountDownLatch latch = delayReadLatch;
if (null != latch) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// no-op
}
}
}
return super.readEntry(ledgerId, entryId);
}
void delayAdd(boolean delayed) {
this.delayAddResponse.set(delayed);
}
void delayRead(boolean delayed, long entryId, CountDownLatch delayReadLatch) {
this.delayReadResponse.set(delayed);
this.delayReadOnEntry.set(entryId);
this.delayReadLatch = delayReadLatch;
}
}
@Test
public void testRecoveryWhenClosingLedgerHandle() throws Exception {
byte[] passwd = "recovery-when-closing-ledger-handle".getBytes(UTF_8);
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
newConf.setEnableParallelRecoveryRead(true);
newConf.setRecoveryReadBatchSize(1);
newConf.setAddEntryTimeout(9999999);
newConf.setReadEntryTimeout(9999999);
final BookKeeper newBk0 = new BookKeeper(newConf);
final LedgerHandle lh0 = newBk0.createLedger(1, 1, 1, digestType, passwd);
final BookKeeper newBk1 = new BookKeeper(newConf);
final LedgerHandle lh1 = newBk1.openLedgerNoRecovery(lh0.getId(), digestType, passwd);
final TestLedgerManager tlm1 = (TestLedgerManager) newBk1.getUnderlyingLedgerManager();
final BookKeeper readBk = new BookKeeper(newConf);
final LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), digestType, passwd);
LOG.info("Create ledger {}", lh0.getId());
// 0) place the bookie with a fake bookie
BookieId address = lh0.getCurrentEnsemble().get(0);
ServerConfiguration conf = killBookie(address);
conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
DelayResponseBookie fakeBookie = new DelayResponseBookie(conf);
bs.add(startBookie(conf, fakeBookie));
bsConfs.add(conf);
// 1) bk0 write two entries
lh0.addEntry("entry-0".getBytes(UTF_8));
lh0.addEntry("entry-1".getBytes(UTF_8));
// 2) readBk read last add confirmed
long lac = readLh.readLastConfirmed();
assertEquals(0L, lac);
lac = lh1.readLastConfirmed();
assertEquals(0L, lac);
final CountDownLatch addLatch = new CountDownLatch(3);
final AtomicInteger numAddFailures = new AtomicInteger(0);
// 3) bk0 write more entries in parallel
fakeBookie.delayAdd(true);
for (int i = 2; i < 5; i++) {
lh0.asyncAddEntry(("entry-" + i).getBytes(UTF_8), new AsyncCallback.AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (BKException.Code.OK != rc) {
numAddFailures.incrementAndGet();
}
addLatch.countDown();
}
}, null);
}
while (fakeBookie.delayQueue.size() < 3) {
// wait until all add requests are queued
Thread.sleep(100);
}
// 4) lac moved to 1L
lac = readLh.readLastConfirmed();
assertEquals(1L, lac);
lac = lh1.readLastConfirmed();
assertEquals(1L, lac);
// 5) bk1 is doing recovery, but the metadata update is delayed
final CountDownLatch readLatch = new CountDownLatch(1);
fakeBookie.delayAdd(false);
fakeBookie.delayRead(true, 3L, readLatch);
final CountDownLatch metadataLatch = new CountDownLatch(1);
tlm1.setLatch(metadataLatch);
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicBoolean recoverSuccess = new AtomicBoolean(false);
((ReadOnlyLedgerHandle) lh1).recover(new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
LOG.info("Recovering ledger {} completed : {}", lh1.getId(), rc);
recoverSuccess.set(BKException.Code.OK == rc);
recoverLatch.countDown();
}
});
Thread.sleep(2000);
readLatch.countDown();
// we don't expected lac being updated before we successfully marked the ledger in recovery
lac = readLh.readLastConfirmed();
assertEquals(1L, lac);
// 6) bk0 closes ledger before bk1 marks in recovery
lh0.close();
assertEquals(1L, lh0.getLastAddConfirmed());
// 7) bk1 proceed recovery and succeed
metadataLatch.countDown();
recoverLatch.await();
assertTrue(recoverSuccess.get());
assertEquals(1L, lh1.getLastAddConfirmed());
// 8) make sure we won't see lac advanced during ledger is closed by bk0 and recovered by bk1
final AtomicLong lacHolder = new AtomicLong(-1234L);
final AtomicInteger rcHolder = new AtomicInteger(-1234);
final CountDownLatch doneLatch = new CountDownLatch(1);
new ReadLastConfirmedOp(bkc.getBookieClient(),
readLh.distributionSchedule,
readLh.macManager,
readLh.ledgerId,
readLh.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
readLh.ledgerKey,
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
rcHolder.set(rc);
lacHolder.set(data.getLastAddConfirmed());
doneLatch.countDown();
}
}).initiate();
doneLatch.await();
assertEquals(BKException.Code.OK, rcHolder.get());
assertEquals(1L, lacHolder.get());
newBk0.close();
newBk1.close();
readBk.close();
}
/**
* Validate ledger can recover with response: (Qw - Qa)+1.
* @throws Exception
*/
@Test
public void testRecoveryWithUnavailableBookie() throws Exception {
byte[] passwd = "".getBytes(UTF_8);
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
final BookKeeper readBk = new BookKeeper(newConf);
final BookKeeper newBk0 = new BookKeeper(newConf);
/**
* Test Group-1 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (3
* -2) + 1 = 2
*/
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuormSize = 2;
LedgerHandle lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, DigestType.DUMMY, passwd);
LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY, passwd);
// Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS, NOT_AVAILABLE
// Expected: Recovery successful Q(response) = 2
int responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK, BKException.Code.NoSuchLedgerExistsException);
assertEquals(responseCode, BKException.Code.OK);
// Test 2: bookie response: OK, NOT_AVAILABLE, NOT_AVAILABLE
// Expected: Recovery fail Q(response) = 1
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK, BKException.Code.BookieHandleNotAvailableException);
assertEquals(responseCode, BKException.Code.BookieHandleNotAvailableException);
/**
* Test Group-2 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (2
* -2) + 1 = 1
*/
ensembleSize = 2;
writeQuorumSize = 2;
ackQuormSize = 2;
lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, DigestType.DUMMY, passwd);
readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY, passwd);
// Test 1: bookie response: OK, NOT_AVAILABLE
// Expected: Recovery successful Q(response) = 1
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK);
assertEquals(responseCode, BKException.Code.OK);
// Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS
// Expected: Recovery successful Q(response) = 2
responseCode = readLACFromQuorum(readLh, BKException.Code.NoSuchLedgerExistsException, BKException.Code.OK);
assertEquals(responseCode, BKException.Code.OK);
// Test 3: bookie response: NOT_AVAILABLE, NOT_AVAILABLE
// Expected: Recovery fail Q(response) = 0
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.BookieHandleNotAvailableException);
assertEquals(responseCode, BKException.Code.BookieHandleNotAvailableException);
newBk0.close();
readBk.close();
}
private int readLACFromQuorum(LedgerHandle ledger, int... bookieLACResponse) throws Exception {
MutableInt responseCode = new MutableInt(100);
CountDownLatch responseLatch = new CountDownLatch(1);
ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(
bkc.getBookieClient(),
ledger.getDistributionSchedule(),
ledger.getDigestManager(),
ledger.getId(),
ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
ledger.getLedgerKey(),
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
System.out.println("response = " + rc);
responseCode.setValue(rc);
responseLatch.countDown();
}
});
byte[] lac = new byte[Long.SIZE * 3];
ByteBuf data = Unpooled.wrappedBuffer(lac, 0, lac.length);
int writerIndex = data.writerIndex();
data.resetWriterIndex();
data.writeLong(ledger.getId());
data.writeLong(0L);
data.writerIndex(writerIndex);
for (int i = 0; i < bookieLACResponse.length; i++) {
readLCOp.readEntryComplete(bookieLACResponse[i], 0, 0, data, i);
}
responseLatch.await();
return responseCode.intValue();
}
}