blob: 93714c53c7813233bdef27b2331748ce19766d8c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests publishing of under replicated ledgers by the Auditor bookie node when
* corresponding bookies identifes as not running.
*/
public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
// Depending on the taste, select the amount of logging
// by decommenting one of the two lines below
// private static final Logger LOG = Logger.getRootLogger();
private static final Logger LOG = LoggerFactory
.getLogger(AuditorLedgerCheckerTest.class);
private static final byte[] ledgerPassword = "aaa".getBytes();
private Random rng; // Random Number Generator
private DigestType digestType;
private String underreplicatedPath;
private Map<String, AuditorElector> auditorElectors = new ConcurrentHashMap<>();
private ZkLedgerUnderreplicationManager urLedgerMgr;
private Set<Long> urLedgerList;
private String electionPath;
private List<Long> ledgerList;
public AuditorLedgerCheckerTest()
throws IOException, KeeperException, InterruptedException,
CompatibilityException {
this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
}
AuditorLedgerCheckerTest(String ledgerManagerFactoryClass)
throws IOException, KeeperException, InterruptedException,
CompatibilityException {
super(3);
LOG.info("Running test case using ledger manager : "
+ ledgerManagerFactoryClass);
this.digestType = DigestType.CRC32;
// set ledger manager name
baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
baseClientConf
.setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
}
@Before
public void setUp() throws Exception {
super.setUp();
underreplicatedPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf)
+ "/underreplication/ledgers";
electionPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf)
+ "/underreplication/auditorelection";
urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis());
startAuditorElectors();
rng = new Random(System.currentTimeMillis()); // Initialize the Random
urLedgerList = new HashSet<Long>();
ledgerList = new ArrayList<Long>(2);
baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
}
@Override
public void tearDown() throws Exception {
stopAuditorElectors();
super.tearDown();
}
private void startAuditorElectors() throws Exception {
for (BookieServer bserver : bs) {
String addr = bserver.getBookieId().toString();
AuditorElector auditorElector = new AuditorElector(addr, baseConf);
auditorElectors.put(addr, auditorElector);
auditorElector.start();
LOG.debug("Starting Auditor Elector");
}
}
private void stopAuditorElectors() throws Exception {
for (AuditorElector auditorElector : auditorElectors.values()) {
auditorElector.shutdown();
LOG.debug("Stopping Auditor Elector!");
}
}
/**
* Test publishing of under replicated ledgers by the auditor bookie.
*/
@Test
public void testSimpleLedger() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
int bkShutdownIndex = bs.size() - 1;
String shutdownBookie = shutdownBookie(bkShutdownIndex);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
waitForAuditToComplete();
underReplicaLatch.await(5, TimeUnit.SECONDS);
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
assertEquals("Missed identifying under replicated ledgers", 1,
urLedgerList.size());
/*
* Sample data format present in the under replicated ledger path
*
* {4=replica: "10.18.89.153:5002"}
*/
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
}
/**
* Test once published under replicated ledger should exists even after
* restarting respective bookie.
*/
@Test
public void testRestartBookie() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
LedgerHandle lh2 = createAndAddEntriesToLedger();
LOG.debug("Created following ledgers : {}, {}", lh1, lh2);
int bkShutdownIndex = bs.size() - 1;
ServerConfiguration bookieConf1 = bsConfs.get(bkShutdownIndex);
String shutdownBookie = shutdownBookie(bkShutdownIndex);
// restart the failed bookie
bs.add(startBookie(bookieConf1));
waitForLedgerMissingReplicas(lh1.getId(), 10, shutdownBookie);
waitForLedgerMissingReplicas(lh2.getId(), 10, shutdownBookie);
}
/**
* Test publishing of under replicated ledgers when multiple bookie failures
* one after another.
*/
@Test
public void testMultipleBookieFailures() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
// failing first bookie
shutdownBookie(bs.size() - 1);
// simulate re-replication
doLedgerRereplication(lh1.getId());
// failing another bookie
String shutdownBookie = shutdownBookie(bs.size() - 1);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertTrue("Ledger should be missing second replica",
waitForLedgerMissingReplicas(lh1.getId(), 10, shutdownBookie));
}
@Test
public void testToggleLedgerReplication() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
ledgerList.add(lh1.getId());
LOG.debug("Created following ledgers : " + ledgerList);
// failing another bookie
CountDownLatch urReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
// disabling ledger replication
urLedgerMgr.disableLedgerReplication();
ArrayList<String> shutdownBookieList = new ArrayList<String>();
shutdownBookieList.add(shutdownBookie(bs.size() - 1));
shutdownBookieList.add(shutdownBookie(bs.size() - 1));
assertFalse("Ledger replication is not disabled!", urReplicaLatch
.await(1, TimeUnit.SECONDS));
// enabling ledger replication
urLedgerMgr.enableLedgerReplication();
assertTrue("Ledger replication is not enabled!", urReplicaLatch.await(
5, TimeUnit.SECONDS));
}
@Test
public void testDuplicateEnDisableAutoRecovery() throws Exception {
urLedgerMgr.disableLedgerReplication();
try {
urLedgerMgr.disableLedgerReplication();
fail("Must throw exception, since AutoRecovery is already disabled");
} catch (UnavailableException e) {
assertTrue("AutoRecovery is not disabled previously!",
e.getCause() instanceof KeeperException.NodeExistsException);
}
urLedgerMgr.enableLedgerReplication();
try {
urLedgerMgr.enableLedgerReplication();
fail("Must throw exception, since AutoRecovery is already enabled");
} catch (UnavailableException e) {
assertTrue("AutoRecovery is not enabled previously!",
e.getCause() instanceof KeeperException.NoNodeException);
}
}
/**
* Test Auditor should consider Readonly bookie as available bookie. Should not publish ur ledgers for
* readonly bookies.
*/
@Test
public void testReadOnlyBookieExclusionFromURLedgersCheck() throws Exception {
LedgerHandle lh = createAndAddEntriesToLedger();
ledgerList.add(lh.getId());
LOG.debug("Created following ledgers : " + ledgerList);
int count = ledgerList.size();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
final int bkIndex = 2;
ServerConfiguration bookieConf = bsConfs.get(bkIndex);
BookieServer bk = bs.get(bkIndex);
bookieConf.setReadOnlyModeEnabled(true);
((BookieImpl) bk.getBookie()).getStateManager().doTransitionToReadOnlyMode();
bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(bkIndex))).get(30, TimeUnit.SECONDS);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for Auditor to finish ledger check.");
waitForAuditToComplete();
assertFalse("latch should not have completed", underReplicaLatch.await(5, TimeUnit.SECONDS));
}
/**
* Test Auditor should consider Readonly bookie fail and publish ur ledgers for readonly bookies.
*/
@Test
public void testReadOnlyBookieShutdown() throws Exception {
LedgerHandle lh = createAndAddEntriesToLedger();
long ledgerId = lh.getId();
ledgerList.add(ledgerId);
LOG.debug("Created following ledgers : " + ledgerList);
int count = ledgerList.size();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
int bkIndex = bs.size() - 1;
LOG.debug("Moving bookie {} {} to read only...", bkIndex, bs.get(bkIndex));
ServerConfiguration bookieConf = bsConfs.get(bkIndex);
BookieServer bk = bs.get(bkIndex);
bookieConf.setReadOnlyModeEnabled(true);
((BookieImpl) bk.getBookie()).getStateManager().doTransitionToReadOnlyMode();
bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(bkIndex))).get(30, TimeUnit.SECONDS);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for Auditor to finish ledger check.");
waitForAuditToComplete();
assertFalse("latch should not have completed", underReplicaLatch.await(1, TimeUnit.SECONDS));
String shutdownBookie = shutdownBookie(bkIndex);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
waitForAuditToComplete();
underReplicaLatch.await(5, TimeUnit.SECONDS);
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
assertEquals("Missed identifying under replicated ledgers", 1, urLedgerList.size());
/*
* Sample data format present in the under replicated ledger path
*
* {4=replica: "10.18.89.153:5002"}
*/
assertTrue("Ledger is not marked as underreplicated:" + ledgerId, urLedgerList.contains(ledgerId));
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie + "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
}
public void testInnerDelayedAuditOfLostBookies() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
// wait for 5 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(5);
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// wait for another 5 seconds for the ledger to get reported as under replicated
assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
}
/**
* Test publishing of under replicated ledgers by the auditor
* bookie is delayed if LostBookieRecoveryDelay option is set.
*/
@Test
public void testDelayedAuditOfLostBookies() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
testInnerDelayedAuditOfLostBookies();
}
/**
* Test publishing of under replicated ledgers by the auditor
* bookie is delayed if LostBookieRecoveryDelay option is set
* and it continues to be delayed even when periodic bookie check
* is set to run every 2 secs. I.e. periodic bookie check doesn't
* override the delay
*/
@Test
public void testDelayedAuditWithPeriodicBookieCheck() throws Exception {
// enable periodic bookie check on a cadence of every 2 seconds.
// this requires us to stop the auditor/auditorElectors, set the
// periodic check interval and restart the auditorElectors
stopAuditorElectors();
baseConf.setAuditorPeriodicBookieCheckInterval(2);
startAuditorElectors();
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
// the delaying of audit should just work despite the fact
// we have enabled periodic bookie check
testInnerDelayedAuditOfLostBookies();
}
@Test
public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
// wait for 50 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(50);
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// set lostBookieRecoveryDelay to 0, so that it triggers AuditTask immediately
urLedgerMgr.setLostBookieRecoveryDelay(0);
// wait for 1 second for the ledger to get reported as under replicated
assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
}
@Test
public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
// wait for 3 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(3);
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// set lostBookieRecoveryDelay to 4, so the pending AuditTask is resheduled
urLedgerMgr.setLostBookieRecoveryDelay(4);
// since we changed the BookieRecoveryDelay period to 4, the audittask shouldn't have been executed
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// wait for 3 seconds (since we already waited for 2 secs) for the ledger to get reported as under replicated
assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(3, TimeUnit.SECONDS));
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
}
@Test
public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
int lostBookieRecoveryDelayConfValue = baseConf.getLostBookieRecoveryDelay();
Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
int lostBookieRecoveryDelayBeforeChange = auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange();
Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
Assert.assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay",
lostBookieRecoveryDelayConfValue, lostBookieRecoveryDelayBeforeChange);
@Cleanup("shutdown") OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
.name("test-scheduler")
.numThreads(1)
.build();
@Cleanup MetadataClientDriver driver =
MetadataDrivers.getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
driver.initialize(baseClientConf, scheduler, NullStatsLogger.INSTANCE, Optional.of(zkc));
// there is no easy way to validate if the Auditor has executed Audit process (Auditor.startAudit),
// without shuttingdown Bookie. To test if by resetting LostBookieRecoveryDelay it does Auditing
// even when there is no pending AuditTask, following approach is needed.
// Here we are creating few ledgers ledgermetadata with non-existing bookies as its ensemble.
// When Auditor does audit it recognizes these ledgers as underreplicated and mark them as
// under-replicated, since these bookies are not available.
int numofledgers = 5;
Random rand = new Random();
for (int i = 0; i < numofledgers; i++) {
ArrayList<BookieId> ensemble = new ArrayList<BookieId>();
ensemble.add(new BookieSocketAddress("99.99.99.99:9999").toBookieId());
ensemble.add(new BookieSocketAddress("11.11.11.11:1111").toBookieId());
ensemble.add(new BookieSocketAddress("88.88.88.88:8888").toBookieId());
long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
LedgerMetadata metadata = LedgerMetadataBuilder.create()
.withId(ledgerId)
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
.withPassword("passwd".getBytes())
.withDigestType(DigestType.CRC32.toApiDigestType())
.newEnsembleEntry(0L, ensemble).build();
try (LedgerManager lm = driver.getLedgerManagerFactory().newLedgerManager()) {
lm.createLedgerMetadata(ledgerId, metadata).get(2000, TimeUnit.MILLISECONDS);
}
ledgerList.add(ledgerId);
}
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelayBeforeChange);
assertTrue("Audit should be triggered and created ledgers should be marked as underreplicated",
underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("All the ledgers should be marked as underreplicated", ledgerList.size(), urLedgerList.size());
auditTask = auditorBookiesAuditor.getAuditTask();
Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
Assert.assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay",
lostBookieRecoveryDelayBeforeChange, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
}
@Test
public void testTriggerAuditorWithPendingAuditTask() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
int lostBookieRecoveryDelay = 5;
// wait for 5 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask);
Assert.assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
// set lostBookieRecoveryDelay to 5 (previous value), so that Auditor is triggered immediately
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
urLedgerList.size());
Thread.sleep(100);
auditTask = auditorBookiesAuditor.getAuditTask();
Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
Assert.assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
}
@Test
public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception {
// wait for a second so that the initial periodic check finishes
Thread.sleep(1000);
Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
.size());
int lostBookieRecoveryDelay = 5;
// wait for 5 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
LOG.debug("Waiting for ledgers to be marked as under replicated");
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask);
Assert.assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
// set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately
urLedgerMgr.setLostBookieRecoveryDelay(0);
assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
urLedgerList.size());
Thread.sleep(100);
auditTask = auditorBookiesAuditor.getAuditTask();
assertEquals("auditTask is supposed to be null", null, auditTask);
assertEquals(
"lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
}
/**
* Test audit of bookies is delayed when one bookie is down. But when
* another one goes down, the audit is started immediately.
*/
@Test
public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
// wait for the periodic bookie check to finish
Thread.sleep(1000);
// create a ledger with a bunch of entries
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
// wait for 10 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(10);
// shutdown a non auditor bookie to avoid an election
String shutdownBookie1 = shutDownNonAuditorBookie();
// wait for 3 seconds and there shouldn't be any under replicated ledgers
// because we have delayed the start of audit by 10 seconds
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(3, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// Now shutdown the second non auditor bookie; We want to make sure that
// the history about having delayed recovery remains. Hence we make sure
// we bring down a non auditor bookie. This should cause the audit to take
// place immediately and not wait for the remaining 7 seconds to elapse
String shutdownBookie2 = shutDownNonAuditorBookie();
// 2 second grace period for the ledgers to get reported as under replicated
Thread.sleep(2000);
// If the following checks pass, it means that audit happened
// within 2 seconds of second bookie going down and it didn't
// wait for 7 more seconds. Hence the second bookie failure doesn't
// delay the audit
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2
+ " are not listed in the ledger as missing replicas :" + data,
data.contains(shutdownBookie1) && data.contains(shutdownBookie2));
}
/**
* Test audit of bookies is delayed during rolling upgrade scenario:
* a bookies goes down and comes up, the next bookie go down and up and so on.
* At any time only one bookie is down.
*/
@Test
public void testDelayedAuditWithRollingUpgrade() throws Exception {
// wait for the periodic bookie check to finish
Thread.sleep(1000);
// create a ledger with a bunch of entries
LedgerHandle lh1 = createAndAddEntriesToLedger();
Long ledgerId = lh1.getId();
LOG.debug("Created ledger : " + ledgerId);
ledgerList.add(ledgerId);
lh1.close();
CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
// wait for 5 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(5);
// shutdown a non auditor bookie to avoid an election
int idx1 = getShutDownNonAuditorBookieIdx("");
ServerConfiguration conf1 = bsConfs.get(idx1);
String shutdownBookie1 = shutdownBookie(idx1);
// wait for 2 seconds and there shouldn't be any under replicated ledgers
// because we have delayed the start of audit by 5 seconds
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// restart the bookie we shut down above
bs.add(startBookie(conf1));
// Now to simulate the rolling upgrade, bring down a bookie different from
// the one we brought down/up above.
String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1);
// since the first bookie that was brought down/up has come up, there is only
// one bookie down at this time. Hence the lost bookie check shouldn't start
// immediately; it will start 5 seconds after the second bookie went down
assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
assertEquals("under replicated ledgers identified when it was not expected", 0,
urLedgerList.size());
// wait for a total of 6 seconds(2+4) for the ledgers to get reported as under replicated
Thread.sleep(4000);
// If the following checks pass, it means that auditing happened
// after lostBookieRecoveryDelay during rolling upgrade as expected
assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing the ledger: " + data,
!data.contains(shutdownBookie1));
assertTrue("Bookie " + shutdownBookie2
+ " is not listed in the ledger as missing replicas :" + data,
data.contains(shutdownBookie2));
LOG.info("*****************Test Complete");
}
private void waitForAuditToComplete() throws Exception {
long endTime = System.currentTimeMillis() + 5_000;
while (System.currentTimeMillis() < endTime) {
Auditor auditor = getAuditorBookiesAuditor();
if (auditor != null) {
Future<?> task = auditor.submitAuditTask();
task.get(5, TimeUnit.SECONDS);
return;
}
Thread.sleep(100);
}
throw new TimeoutException("Could not find an audit within 5 seconds");
}
/**
* Wait for ledger to be underreplicated, and to be missing all replicas specified.
*/
private boolean waitForLedgerMissingReplicas(Long ledgerId, long secondsToWait, String... replicas)
throws Exception {
for (int i = 0; i < secondsToWait; i++) {
try {
UnderreplicatedLedger data = urLedgerMgr.getLedgerUnreplicationInfo(ledgerId);
boolean all = true;
for (String r : replicas) {
all = all && data.getReplicaList().contains(r);
}
if (all) {
return true;
}
} catch (Exception e) {
// may not find node
}
Thread.sleep(1000);
}
return false;
}
private CountDownLatch registerUrLedgerWatcher(int count)
throws KeeperException, InterruptedException {
final CountDownLatch underReplicaLatch = new CountDownLatch(count);
for (Long ledgerId : ledgerList) {
Watcher urLedgerWatcher = new ChildWatcher(underReplicaLatch);
String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(underreplicatedPath,
ledgerId);
zkc.exists(znode, urLedgerWatcher);
}
return underReplicaLatch;
}
private void doLedgerRereplication(Long... ledgerIds)
throws UnavailableException {
for (int i = 0; i < ledgerIds.length; i++) {
long lid = urLedgerMgr.getLedgerToRereplicate();
assertTrue("Received unexpected ledgerid", Arrays.asList(ledgerIds).contains(lid));
urLedgerMgr.markLedgerReplicated(lid);
urLedgerMgr.releaseUnderreplicatedLedger(lid);
}
}
private String shutdownBookie(int bkShutdownIndex) throws Exception {
BookieServer bkServer = bs.get(bkShutdownIndex);
String bookieAddr = bkServer.getBookieId().toString();
LOG.debug("Shutting down bookie:" + bookieAddr);
killBookie(bkShutdownIndex);
auditorElectors.get(bookieAddr).shutdown();
auditorElectors.remove(bookieAddr);
return bookieAddr;
}
private LedgerHandle createAndAddEntriesToLedger() throws BKException,
InterruptedException {
int numEntriesToWrite = 100;
// Create a ledger
LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
LOG.info("Ledger ID: " + lh.getId());
addEntry(numEntriesToWrite, lh);
return lh;
}
private void addEntry(int numEntriesToWrite, LedgerHandle lh)
throws InterruptedException, BKException {
final CountDownLatch completeLatch = new CountDownLatch(numEntriesToWrite);
final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(Integer.MAX_VALUE));
entry.position(0);
lh.asyncAddEntry(entry.array(), new AddCallback() {
public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
rc.compareAndSet(BKException.Code.OK, rc2);
completeLatch.countDown();
}
}, null);
}
completeLatch.await();
if (rc.get() != BKException.Code.OK) {
throw BKException.create(rc.get());
}
}
private Map<Long, String> getUrLedgerData(Set<Long> urLedgerList)
throws KeeperException, InterruptedException {
Map<Long, String> urLedgerData = new HashMap<Long, String>();
for (Long ledgerId : urLedgerList) {
String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(underreplicatedPath,
ledgerId);
byte[] data = zkc.getData(znode, false, null);
urLedgerData.put(ledgerId, new String(data));
}
return urLedgerData;
}
private class ChildWatcher implements Watcher {
private final CountDownLatch underReplicaLatch;
public ChildWatcher(CountDownLatch underReplicaLatch) {
this.underReplicaLatch = underReplicaLatch;
}
@Override
public void process(WatchedEvent event) {
LOG.info("Received notification for the ledger path : "
+ event.getPath());
for (Long ledgerId : ledgerList) {
if (event.getPath().contains(ledgerId + "")) {
urLedgerList.add(ledgerId);
}
}
LOG.debug("Count down and waiting for next notification");
// count down and waiting for next notification
underReplicaLatch.countDown();
}
}
private BookieServer getAuditorBookie() throws Exception {
List<BookieServer> auditors = new LinkedList<BookieServer>();
byte[] data = zkc.getData(electionPath, false, null);
assertNotNull("Auditor election failed", data);
for (BookieServer bks : bs) {
if (new String(data).contains(bks.getBookieId() + "")) {
auditors.add(bks);
}
}
assertEquals("Multiple Bookies acting as Auditor!", 1, auditors
.size());
return auditors.get(0);
}
private Auditor getAuditorBookiesAuditor() throws Exception {
BookieServer auditorBookieServer = getAuditorBookie();
String bookieAddr = auditorBookieServer.getBookieId().toString();
return auditorElectors.get(bookieAddr).auditor;
}
private String shutDownNonAuditorBookie() throws Exception {
// shutdown bookie which is not an auditor
int indexOf = bs.indexOf(getAuditorBookie());
int bkIndexDownBookie;
if (indexOf < bs.size() - 1) {
bkIndexDownBookie = indexOf + 1;
} else {
bkIndexDownBookie = indexOf - 1;
}
return shutdownBookie(bkIndexDownBookie);
}
private int getShutDownNonAuditorBookieIdx(String exclude) throws Exception {
// shutdown bookie which is not an auditor
int indexOf = bs.indexOf(getAuditorBookie());
int bkIndexDownBookie = 0;
for (int i = 0; i < bs.size(); i++) {
if (i == indexOf || bs.get(i).getBookieId().toString().equals(exclude)) {
continue;
}
bkIndexDownBookie = i;
break;
}
return bkIndexDownBookie;
}
private String shutDownNonAuditorBookie(String exclude) throws Exception {
return shutdownBookie(getShutDownNonAuditorBookieIdx(exclude));
}
}