blob: e8f83d15c4e6596ea676bc22c36fd23f5b96db78 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This unit test tests ledger recovery.
*/
public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryTest.class);
private final DigestType digestType;
public LedgerRecoveryTest() {
super(3);
this.digestType = DigestType.CRC32;
this.baseConf.setAllowEphemeralPorts(false);
}
private void testInternal(int numEntries) throws Exception {
/*
* Create ledger.
*/
LedgerHandle beforelh = null;
beforelh = bkc.createLedger(digestType, "".getBytes());
String tmp = "BookKeeper is cool!";
for (int i = 0; i < numEntries; i++) {
beforelh.addEntry(tmp.getBytes());
}
long length = (long) (numEntries * tmp.length());
/*
* Try to open ledger.
*/
LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, "".getBytes());
/*
* Check if has recovered properly.
*/
assertEquals("Has not recovered correctly", numEntries - 1, afterlh.getLastAddConfirmed());
assertEquals("Has not set the length correctly", length, afterlh.getLength());
}
@Test
public void testLedgerRecovery() throws Exception {
testInternal(100);
}
@Test
public void testEmptyLedgerRecoveryOne() throws Exception {
testInternal(1);
}
@Test
public void testEmptyLedgerRecovery() throws Exception {
testInternal(0);
}
@Test
public void testLedgerRecoveryWithWrongPassword() throws Exception {
// Create a ledger
byte[] ledgerPassword = "aaaa".getBytes();
LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
// bkc.initMessageDigest("SHA1");
long ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
String tmp = "BookKeeper is cool!";
int numEntries = 30;
for (int i = 0; i < numEntries; i++) {
lh.addEntry(tmp.getBytes());
}
// Using wrong password
ledgerPassword = "bbbb".getBytes();
try {
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
fail("Opening ledger with wrong password should fail");
} catch (BKException e) {
// should failed
}
}
@Test
public void testLedgerRecoveryWithNotEnoughBookies() throws Exception {
int numEntries = 3;
// Create a ledger
LedgerHandle beforelh = null;
beforelh = bkc.createLedger(3, 3, digestType, "".getBytes());
String tmp = "BookKeeper is cool!";
for (int i = 0; i < numEntries; i++) {
beforelh.addEntry(tmp.getBytes());
}
// shutdown first bookie server
bs.get(0).shutdown();
bs.remove(0);
/*
* Try to open ledger.
*/
try {
bkc.openLedger(beforelh.getId(), digestType, "".getBytes());
fail("should not reach here!");
} catch (Exception e) {
// should thrown recovery exception
}
// start a new bookie server
startNewBookie();
LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, "".getBytes());
/*
* Check if has recovered properly.
*/
assertEquals(numEntries - 1, afterlh.getLastAddConfirmed());
}
@Test
public void testLedgerRecoveryWithSlowBookie() throws Exception {
for (int i = 0; i < 3; i++) {
LOG.info("TestLedgerRecoveryWithAckQuorum @ slow bookie {}", i);
ledgerRecoveryWithSlowBookie(3, 3, 2, 1, i);
}
}
private void ledgerRecoveryWithSlowBookie(int ensembleSize, int writeQuorumSize,
int ackQuorumSize, int numEntries, int slowBookieIdx) throws Exception {
// Create a ledger
LedgerHandle beforelh = null;
beforelh = bkc.createLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
digestType, "".getBytes());
// kill first bookie server to start a fake one to simulate a slow bookie
// and failed to add entry on crash
// until write succeed
BookieId host = beforelh.getCurrentEnsemble().get(slowBookieIdx);
ServerConfiguration conf = killBookie(host);
Bookie fakeBookie = new BookieImpl(conf) {
@Override
public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
}
};
bsConfs.add(conf);
bs.add(startBookie(conf, fakeBookie));
// avoid not-enough-bookies case
startNewBookie();
// write would still succeed with 2 bookies ack
String tmp = "BookKeeper is cool!";
for (int i = 0; i < numEntries; i++) {
beforelh.addEntry(tmp.getBytes());
}
conf = killBookie(host);
bsConfs.add(conf);
// the bookie goes normally
bs.add(startBookie(conf));
/*
* Try to open ledger.
*/
LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, "".getBytes());
/*
* Check if has recovered properly.
*/
assertEquals(numEntries - 1, afterlh.getLastAddConfirmed());
}
/**
* {@link https://issues.apache.org/jira/browse/BOOKKEEPER-355}
* A recovery during a rolling restart shouldn't affect the ability
* to recovery the ledger later.
* We have a ledger on ensemble B1,B2,B3.
* The sequence of events is
* 1. B1 brought down for maintenance
* 2. Ledger recovery started
* 3. B2 answers read last confirmed.
* 4. B1 replaced in ensemble by B4
* 5. Write to B4 fails for some reason
* 6. B1 comes back up.
* 7. B2 goes down for maintenance.
* 8. Ledger recovery starts (ledger is now unavailable)
*/
@Test
public void testLedgerRecoveryWithRollingRestart() throws Exception {
LedgerHandle lhbefore = bkc.createLedger(numBookies, 2, digestType, "".getBytes());
for (int i = 0; i < (numBookies * 3) + 1; i++) {
lhbefore.addEntry("data".getBytes());
}
// Add a dead bookie to the cluster
ServerConfiguration conf = newServerConfiguration();
Bookie deadBookie1 = new BookieImpl(conf) {
@Override
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
throw new IOException("Couldn't write for some reason");
}
};
bsConfs.add(conf);
bs.add(startBookie(conf, deadBookie1));
// kill first bookie server
BookieId bookie1 = lhbefore.getCurrentEnsemble().get(0);
ServerConfiguration conf1 = killBookie(bookie1);
// Try to recover and fence the ledger after killing one bookie in the
// ensemble in the ensemble, and another bookie is available in zk, but not writtable
try {
bkc.openLedger(lhbefore.getId(), digestType, "".getBytes());
fail("Shouldn't be able to open ledger, there should be entries missing");
} catch (BKException.BKLedgerRecoveryException e) {
// expected
}
// restart the first server, kill the second
bsConfs.add(conf1);
bs.add(startBookie(conf1));
BookieId bookie2 = lhbefore.getCurrentEnsemble().get(1);
ServerConfiguration conf2 = killBookie(bookie2);
// using async, because this could trigger an assertion
final AtomicInteger returnCode = new AtomicInteger(0);
final CountDownLatch openLatch = new CountDownLatch(1);
bkc.asyncOpenLedger(lhbefore.getId(), digestType, "".getBytes(),
new AsyncCallback.OpenCallback() {
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
returnCode.set(rc);
openLatch.countDown();
if (rc == BKException.Code.OK) {
try {
lh.close();
} catch (Exception e) {
LOG.error("Exception closing ledger handle", e);
}
}
}
}, null);
assertTrue("Open call should have completed", openLatch.await(5, TimeUnit.SECONDS));
assertFalse("Open should not have succeeded", returnCode.get() == BKException.Code.OK);
bsConfs.add(conf2);
bs.add(startBookie(conf2));
LedgerHandle lhafter = bkc.openLedger(lhbefore.getId(), digestType,
"".getBytes());
assertEquals("Fenced ledger should have correct lastAddConfirmed",
lhbefore.getLastAddConfirmed(), lhafter.getLastAddConfirmed());
}
/**
* {@link https://issues.apache.org/jira/browse/BOOKKEEPER-355}
* Verify that if a recovery happens with 1 replica missing, and it's replaced
* with a faulty bookie, it doesn't break future recovery from happening.
* 1. Ledger is created with quorum size as 2, and entries are written
* 2. Now first bookie is in the ensemble is brought down.
* 3. Another client fence and trying to recover the same ledger
* 4. During this time ensemble change will happen
* and new bookie will be added. But this bookie is not able to write.
* 5. This recovery will fail.
* 7. A new non-faulty bookie comes up
* 8. Another client trying to recover the same ledger.
*/
@Test
public void testBookieFailureDuringRecovery() throws Exception {
LedgerHandle lhbefore = bkc.createLedger(numBookies, 2, digestType, "".getBytes());
for (int i = 0; i < (numBookies * 3) + 1; i++) {
lhbefore.addEntry("data".getBytes());
}
// Add a dead bookie to the cluster
ServerConfiguration conf = newServerConfiguration();
Bookie deadBookie1 = new BookieImpl(conf) {
@Override
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
throw new IOException("Couldn't write for some reason");
}
};
bsConfs.add(conf);
bs.add(startBookie(conf, deadBookie1));
// kill first bookie server
BookieId bookie1 = lhbefore.getCurrentEnsemble().get(0);
killBookie(bookie1);
// Try to recover and fence the ledger after killing one bookie in the
// ensemble in the ensemble, and another bookie is available in zk but not writtable
try {
bkc.openLedger(lhbefore.getId(), digestType, "".getBytes());
fail("Shouldn't be able to open ledger, there should be entries missing");
} catch (BKException.BKLedgerRecoveryException e) {
// expected
}
// start a new good server
startNewBookie();
LedgerHandle lhafter = bkc.openLedger(lhbefore.getId(), digestType,
"".getBytes());
assertEquals("Fenced ledger should have correct lastAddConfirmed",
lhbefore.getLastAddConfirmed(), lhafter.getLastAddConfirmed());
}
/**
* Verify that it doesn't break the recovery when changing ensemble in
* recovery add.
*/
@Test
public void testEnsembleChangeDuringRecovery() throws Exception {
LedgerHandle lh = bkc.createLedger(numBookies, 2, 2, digestType, "".getBytes());
int numEntries = (numBookies * 3) + 1;
final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
final CountDownLatch addDone = new CountDownLatch(1);
for (int i = 0; i < numEntries; i++) {
lh.asyncAddEntry("data".getBytes(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (BKException.Code.OK != rc) {
addDone.countDown();
return;
}
if (numPendingAdds.decrementAndGet() == 0) {
addDone.countDown();
}
}
}, null);
}
addDone.await(10, TimeUnit.SECONDS);
if (numPendingAdds.get() > 0) {
fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
}
// kill first 2 bookies to replace bookies
BookieId bookie1 = lh.getCurrentEnsemble().get(0);
ServerConfiguration conf1 = killBookie(bookie1);
BookieId bookie2 = lh.getCurrentEnsemble().get(1);
ServerConfiguration conf2 = killBookie(bookie2);
// replace these two bookies
startDeadBookie(conf1);
startDeadBookie(conf2);
// kick in two brand new bookies
startNewBookie();
startNewBookie();
// two dead bookies are put in the ensemble which would cause ensemble
// change
LedgerHandle recoveredLh = bkc.openLedger(lh.getId(), digestType, "".getBytes());
assertEquals("Fenced ledger should have correct lastAddConfirmed", lh.getLastAddConfirmed(),
recoveredLh.getLastAddConfirmed());
}
private void startDeadBookie(ServerConfiguration conf) throws Exception {
Bookie rBookie = new BookieImpl(conf) {
@Override
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a dead bookie
throw new IOException("Couldn't write entries for some reason");
}
};
bsConfs.add(conf);
bs.add(startBookie(conf, rBookie));
}
@Test
public void testBatchRecoverySize3() throws Exception {
batchRecovery(3);
}
@Test
public void testBatchRecoverySize13() throws Exception {
batchRecovery(13);
}
private void batchRecovery(int batchSize) throws Exception {
ClientConfiguration newConf = new ClientConfiguration()
.setReadEntryTimeout(60000)
.setAddEntryTimeout(60000)
.setEnableParallelRecoveryRead(false)
.setRecoveryReadBatchSize(batchSize);
newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper newBk = new BookKeeper(newConf);
LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
int numEntries = (numBookies * 3) + 1;
final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
final CountDownLatch addDone = new CountDownLatch(1);
for (int i = 0; i < numEntries; i++) {
lh.asyncAddEntry(("" + i).getBytes(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (BKException.Code.OK != rc) {
addDone.countDown();
return;
}
if (numPendingAdds.decrementAndGet() == 0) {
addDone.countDown();
}
}
}, null);
}
latch1.countDown();
latch2.countDown();
addDone.await(10, TimeUnit.SECONDS);
assertEquals(0, numPendingAdds.get());
LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
assertEquals(BookieProtocol.INVALID_ENTRY_ID, recoverLh.getLastAddConfirmed());
MockClientContext parallelReadCtx = MockClientContext.copyOf(bkc.getClientCtx())
.setConf(ClientInternalConf.fromConfig(newConf.setEnableParallelRecoveryRead(true)));
LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh, parallelReadCtx);
CompletableFuture<LedgerHandle> f = recoveryOp.initiate();
f.get(10, TimeUnit.SECONDS);
assertEquals(numEntries, recoveryOp.readCount.get());
assertEquals(numEntries, recoveryOp.writeCount.get());
Enumeration<LedgerEntry> enumeration = recoverLh.readEntries(0, numEntries - 1);
int numReads = 0;
while (enumeration.hasMoreElements()) {
LedgerEntry entry = enumeration.nextElement();
assertEquals((long) numReads, entry.getEntryId());
assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
++numReads;
}
assertEquals(numEntries, numReads);
newBk.close();
}
}