blob: 4c34c53af3d64f05e7dba5aee3b80243c5909f34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Integration tests verifies the complete functionality of the
* Auditor-rereplication process: Auditor will publish the bookie failures,
* consequently ReplicationWorker will get the notifications and act on it.
*/
public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory
.getLogger(BookieAutoRecoveryTest.class);
private static final byte[] PASSWD = "admin".getBytes();
private static final byte[] data = "TESTDATA".getBytes();
private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds
private DigestType digestType;
private LedgerManagerFactory mFactory;
private LedgerUnderreplicationManager underReplicationManager;
private LedgerManager ledgerManager;
private final String UNDERREPLICATED_PATH = baseClientConf
.getZkLedgersRootPath() + "/underreplication/ledgers";
public BookieAutoRecoveryTest() throws IOException, KeeperException,
InterruptedException, UnavailableException, CompatibilityException {
super(3);
baseConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
baseClientConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
this.digestType = DigestType.MAC;
setAutoRecoveryEnabled(true);
}
@Override
public void setUp() throws Exception {
super.setUp();
baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
// initialize urReplicationManager
mFactory = LedgerManagerFactory.newLedgerManagerFactory(baseClientConf,
zkc);
underReplicationManager = mFactory.newLedgerUnderreplicationManager();
LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
.newLedgerManagerFactory(baseClientConf, zkc);
ledgerManager = newLedgerManagerFactory.newLedgerManager();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (null != mFactory) {
mFactory.uninitialize();
mFactory = null;
}
if (null != underReplicationManager) {
underReplicationManager.close();
underReplicationManager = null;
}
if (null != ledgerManager) {
ledgerManager.close();
ledgerManager = null;
}
}
/**
* Test verifies publish urLedger by Auditor and replication worker is
* picking up the entries and finishing the rereplication of open ledger
*/
@Test(timeout = 90000)
public void testOpenLedgers() throws Exception {
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
LedgerHandle lh = listOfLedgerHandle.get(0);
int ledgerReplicaIndex = 0;
InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
.getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
final String urLedgerZNode = getUrLedgerZNode(lh);
ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr);
CountDownLatch latch = new CountDownLatch(1);
assertNull("UrLedger already exists!",
watchUrLedgerNode(urLedgerZNode, latch));
LOG.info("Killing Bookie :" + replicaToKillAddr);
killBookie(replicaToKillAddr);
// waiting to publish urLedger znode by Auditor
latch.await();
latch = new CountDownLatch(1);
LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+ " to know the status of rereplication process");
assertNotNull("UrLedger doesn't exists!",
watchUrLedgerNode(urLedgerZNode, latch));
// starting the replication service, so that he will be able to act as
// target bookie
startNewBookie();
int newBookieIndex = bs.size() - 1;
BookieServer newBookieServer = bs.get(newBookieIndex);
LOG.debug("Waiting to finish the replication of failed bookie : "
+ replicaToKillAddr);
latch.await();
// grace period to update the urledger metadata in zookeeper
LOG.info("Waiting to update the urledger metadata in zookeeper");
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(0), ledgerReplicaIndex);
}
/**
* Test verifies publish urLedger by Auditor and replication worker is
* picking up the entries and finishing the rereplication of closed ledgers
*/
@Test(timeout = 90000)
public void testClosedLedgers() throws Exception {
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
closeLedgers(listOfLedgerHandle);
LedgerHandle lhandle = listOfLedgerHandle.get(0);
int ledgerReplicaIndex = 0;
InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
.getLedgerMetadata(lhandle).getEnsembles().get(0L).get(0);
CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr);
listOfReplicaIndex.add(ledgerReplicaIndex);
assertNull("UrLedger already exists!",
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
}
LOG.info("Killing Bookie :" + replicaToKillAddr);
killBookie(replicaToKillAddr);
// waiting to publish urLedger znode by Auditor
latch.await();
// Again watching the urLedger znode to know the replication status
latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
String urLedgerZNode = getUrLedgerZNode(lh);
LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+ " to know the status of rereplication process");
assertNotNull("UrLedger doesn't exists!",
watchUrLedgerNode(urLedgerZNode, latch));
}
// starting the replication service, so that he will be able to act as
// target bookie
startNewBookie();
int newBookieIndex = bs.size() - 1;
BookieServer newBookieServer = bs.get(newBookieIndex);
LOG.debug("Waiting to finish the replication of failed bookie : "
+ replicaToKillAddr);
// waiting to finish replication
latch.await();
// grace period to update the urledger metadata in zookeeper
LOG.info("Waiting to update the urledger metadata in zookeeper");
for (int index = 0; index < listOfLedgerHandle.size(); index++) {
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(index),
listOfReplicaIndex.get(index));
}
}
/**
* Test stopping replica service while replication in progress. Considering
* when there is an exception will shutdown Auditor and RW processes. After
* restarting should be able to finish the re-replication activities
*/
@Test(timeout = 90000)
public void testStopWhileReplicationInProgress() throws Exception {
int numberOfLedgers = 2;
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(
numberOfLedgers, 5);
closeLedgers(listOfLedgerHandle);
LedgerHandle handle = listOfLedgerHandle.get(0);
InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
.getLedgerMetadata(handle).getEnsembles().get(0L).get(0);
LOG.info("Killing Bookie:" + replicaToKillAddr);
// Each ledger, there will be two events : create urLedger and after
// rereplication delete urLedger
CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
for (int i = 0; i < listOfLedgerHandle.size(); i++) {
final String urLedgerZNode = getUrLedgerZNode(listOfLedgerHandle
.get(i));
assertNull("UrLedger already exists!",
watchUrLedgerNode(urLedgerZNode, latch));
int replicaIndexInLedger = getReplicaIndexInLedger(
listOfLedgerHandle.get(i), replicaToKillAddr);
listOfReplicaIndex.add(replicaIndexInLedger);
}
LOG.info("Killing Bookie :" + replicaToKillAddr);
killBookie(replicaToKillAddr);
// waiting to publish urLedger znode by Auditor
latch.await();
// Again watching the urLedger znode to know the replication status
latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
String urLedgerZNode = getUrLedgerZNode(lh);
LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+ " to know the status of rereplication process");
assertNotNull("UrLedger doesn't exists!",
watchUrLedgerNode(urLedgerZNode, latch));
}
// starting the replication service, so that he will be able to act as
// target bookie
startNewBookie();
int newBookieIndex = bs.size() - 1;
BookieServer newBookieServer = bs.get(newBookieIndex);
LOG.debug("Waiting to finish the replication of failed bookie : "
+ replicaToKillAddr);
while (true) {
if (latch.getCount() < numberOfLedgers || latch.getCount() <= 0) {
stopReplicationService();
LOG.info("Latch Count is:" + latch.getCount());
break;
}
// grace period to take breath
Thread.sleep(1000);
}
startReplicationService();
LOG.info("Waiting to finish rereplication processes");
latch.await();
// grace period to update the urledger metadata in zookeeper
LOG.info("Waiting to update the urledger metadata in zookeeper");
for (int index = 0; index < listOfLedgerHandle.size(); index++) {
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(index),
listOfReplicaIndex.get(index));
}
}
/**
* Verify the published urledgers of deleted ledgers(those ledgers where
* deleted after publishing as urledgers by Auditor) should be cleared off
* by the newly selected replica bookie
*/
@Test(timeout = 30000)
public void testNoSuchLedgerExists() throws Exception {
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2, 5);
CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
assertNull("UrLedger already exists!",
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
}
InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
.getLedgerMetadata(listOfLedgerHandle.get(0)).getEnsembles()
.get(0L).get(0);
killBookie(replicaToKillAddr);
replicaToKillAddr = LedgerHandleAdapter
.getLedgerMetadata(listOfLedgerHandle.get(0)).getEnsembles()
.get(0L).get(0);
killBookie(replicaToKillAddr);
// waiting to publish urLedger znode by Auditor
latch.await();
latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
assertNotNull("UrLedger doesn't exists!",
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
}
// delete ledgers
for (LedgerHandle lh : listOfLedgerHandle) {
bkc.deleteLedger(lh.getId());
}
startNewBookie();
// waiting to delete published urledgers, since it doesn't exists
latch.await();
for (LedgerHandle lh : listOfLedgerHandle) {
assertNull("UrLedger still exists after rereplication",
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
}
}
/**
* Test that if a empty ledger loses the bookie not in the quorum for entry 0, it will
* still be openable when it loses enough bookies to lose a whole quorum.
*/
@Test(timeout=10000)
public void testEmptyLedgerLosesQuorumEventually() throws Exception {
LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD);
CountDownLatch latch = new CountDownLatch(1);
String urZNode = getUrLedgerZNode(lh);
watchUrLedgerNode(urZNode, latch);
InetSocketAddress replicaToKill = LedgerHandleAdapter
.getLedgerMetadata(lh).getEnsembles().get(0L).get(2);
LOG.info("Killing last bookie, {}, in ensemble {}", replicaToKill,
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L));
killBookie(replicaToKill);
getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs
assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS));
latch = new CountDownLatch(1);
Stat s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated
if (s != null) {
assertTrue("Should be marked as replicated", latch.await(10, TimeUnit.SECONDS));
}
replicaToKill = LedgerHandleAdapter
.getLedgerMetadata(lh).getEnsembles().get(0L).get(1);
LOG.info("Killing second bookie, {}, in ensemble {}", replicaToKill,
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L));
killBookie(replicaToKill);
getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs
assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS));
latch = new CountDownLatch(1);
s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated
if (s != null) {
assertTrue("Should be marked as replicated", latch.await(5, TimeUnit.SECONDS));
}
// should be able to open ledger without issue
bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD);
}
private int getReplicaIndexInLedger(LedgerHandle lh,
InetSocketAddress replicaToKill) {
SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = LedgerHandleAdapter
.getLedgerMetadata(lh).getEnsembles();
int ledgerReplicaIndex = -1;
for (InetSocketAddress addr : ensembles.get(0L)) {
++ledgerReplicaIndex;
if (addr.equals(replicaToKill)) {
break;
}
}
return ledgerReplicaIndex;
}
private void verifyLedgerEnsembleMetadataAfterReplication(
BookieServer newBookieServer, LedgerHandle lh,
int ledgerReplicaIndex) throws Exception {
LedgerHandle openLedger = bkc
.openLedger(lh.getId(), digestType, PASSWD);
InetSocketAddress inetSocketAddress = LedgerHandleAdapter
.getLedgerMetadata(openLedger).getEnsembles().get(0L)
.get(ledgerReplicaIndex);
assertEquals("Rereplication has been failed and ledgerReplicaIndex :"
+ ledgerReplicaIndex, newBookieServer.getLocalAddress(),
inetSocketAddress);
}
private void closeLedgers(List<LedgerHandle> listOfLedgerHandle)
throws InterruptedException, BKException {
for (LedgerHandle lh : listOfLedgerHandle) {
lh.close();
}
}
private List<LedgerHandle> createLedgersAndAddEntries(int numberOfLedgers,
int numberOfEntries) throws InterruptedException, BKException {
List<LedgerHandle> listOfLedgerHandle = new ArrayList<LedgerHandle>(
numberOfLedgers);
for (int index = 0; index < numberOfLedgers; index++) {
LedgerHandle lh = bkc.createLedger(3, 3, digestType, PASSWD);
listOfLedgerHandle.add(lh);
for (int i = 0; i < numberOfEntries; i++) {
lh.addEntry(data);
}
}
return listOfLedgerHandle;
}
private String getUrLedgerZNode(LedgerHandle lh) {
return ZkLedgerUnderreplicationManager.getUrLedgerZnode(
UNDERREPLICATED_PATH, lh.getId());
}
private Stat watchUrLedgerNode(final String znode,
final CountDownLatch latch) throws KeeperException,
InterruptedException {
return zkc.exists(znode, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
LOG.info("Recieved Ledger rereplication completion event :"
+ event.getType());
latch.countDown();
}
if (event.getType() == EventType.NodeCreated) {
LOG.info("Recieved urLedger publishing event :"
+ event.getType());
latch.countDown();
}
}
});
}
}