blob: 3403d7f4d616a64f75d7c8cc0702f5fd57fd7748 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.TextFormat;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.ZkLayoutManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the zookeeper implementation of the ledger replication manager.
*/
public class TestLedgerUnderreplicationManager {
static final Logger LOG = LoggerFactory.getLogger(TestLedgerUnderreplicationManager.class);
ZooKeeperUtil zkUtil = null;
ServerConfiguration conf = null;
ExecutorService executor = null;
LedgerManagerFactory lmf1 = null;
LedgerManagerFactory lmf2 = null;
ZooKeeper zkc1 = null;
ZooKeeper zkc2 = null;
String basePath;
String urLedgerPath;
boolean isLedgerReplicationDisabled = true;
@Before
public void setupZooKeeper() throws Exception {
zkUtil = new ZooKeeperUtil();
zkUtil.startCluster();
conf = TestBKConfiguration.newServerConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
executor = Executors.newCachedThreadPool();
zkc1 = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(10000)
.build();
zkc2 = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(10000)
.build();
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
basePath = zkLedgersRootPath + '/'
+ BookKeeperConstants.UNDER_REPLICATION_NODE;
urLedgerPath = basePath
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
lmf1 = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
conf,
new ZkLayoutManager(
zkc1,
zkLedgersRootPath,
ZkUtils.getACLs(conf)));
lmf2 = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
conf,
new ZkLayoutManager(
zkc2,
zkLedgersRootPath,
ZkUtils.getACLs(conf)));
}
@After
public void teardownZooKeeper() throws Exception {
if (zkUtil != null) {
zkUtil.killCluster();
zkUtil = null;
}
if (executor != null) {
executor = null;
}
if (zkc1 != null) {
zkc1.close();
zkc1 = null;
}
if (zkc2 != null) {
zkc2.close();
zkc2 = null;
}
if (lmf1 != null) {
lmf1.close();
lmf1 = null;
}
if (lmf2 != null) {
lmf2.close();
lmf2 = null;
}
}
private Future<Long> getLedgerToReplicate(final LedgerUnderreplicationManager m) {
return executor.submit(new Callable<Long>() {
public Long call() {
try {
return m.getLedgerToRereplicate();
} catch (Exception e) {
LOG.error("Error getting ledger id", e);
return -1L;
}
}
});
}
/**
* Test basic interactions with the ledger underreplication
* manager.
* Mark some ledgers as underreplicated.
* Ensure that getLedgerToReplicate will block until it a ledger
* becomes available.
*/
@Test
public void testBasicInteraction() throws Exception {
Set<Long> ledgers = new HashSet<Long>();
ledgers.add(0xdeadbeefL);
ledgers.add(0xbeefcafeL);
ledgers.add(0xffffbeefL);
ledgers.add(0xfacebeefL);
String missingReplica = "localhost:3181";
int count = 0;
LedgerUnderreplicationManager m = lmf1.newLedgerUnderreplicationManager();
Iterator<Long> iter = ledgers.iterator();
while (iter.hasNext()) {
m.markLedgerUnderreplicated(iter.next(), missingReplica);
count++;
}
List<Future<Long>> futures = new ArrayList<Future<Long>>();
for (int i = 0; i < count; i++) {
futures.add(getLedgerToReplicate(m));
}
for (Future<Long> f : futures) {
Long l = f.get(5, TimeUnit.SECONDS);
assertTrue(ledgers.remove(l));
}
Future<Long> f = getLedgerToReplicate(m);
try {
f.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// correct behaviour
}
Long newl = 0xfefefefefefeL;
m.markLedgerUnderreplicated(newl, missingReplica);
assertEquals("Should have got the one just added", newl, f.get(5, TimeUnit.SECONDS));
}
/**
* Test locking for ledger unreplication manager.
* If there's only one ledger marked for rereplication,
* and one client has it, it should be locked; another
* client shouldn't be able to get it. If the first client dies
* however, the second client should be able to get it.
*/
@Test
public void testLocking() throws Exception {
String missingReplica = "localhost:3181";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
Long ledger = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledger, missingReplica);
Future<Long> f = getLedgerToReplicate(m1);
Long l = f.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I just marked", ledger, l);
f = getLedgerToReplicate(m2);
try {
f.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// correct behaviour
}
zkc1.close(); // should kill the lock
zkc1 = null;
l = f.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I marked", ledger, l);
}
/**
* Test that when a ledger has been marked as replicated, it
* will not be offered to anther client.
* This test checked that by marking two ledgers, and acquiring
* them on a single client. It marks one as replicated and then
* the client is killed. We then check that another client can
* acquire a ledger, and that it's not the one that was previously
* marked as replicated.
*/
@Test
public void testMarkingAsReplicated() throws Exception {
String missingReplica = "localhost:3181";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
Long ledgerB = 0xdefadebL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica);
m1.markLedgerUnderreplicated(ledgerB, missingReplica);
Future<Long> fA = getLedgerToReplicate(m1);
Future<Long> fB = getLedgerToReplicate(m1);
Long lA = fA.get(5, TimeUnit.SECONDS);
Long lB = fB.get(5, TimeUnit.SECONDS);
assertTrue("Should be the ledgers I just marked",
(lA.equals(ledgerA) && lB.equals(ledgerB))
|| (lA.equals(ledgerB) && lB.equals(ledgerA)));
Future<Long> f = getLedgerToReplicate(m2);
try {
f.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// correct behaviour
}
m1.markLedgerReplicated(lA);
zkc1.close(); // should kill the lock
zkc1 = null;
Long l = f.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I marked", lB, l);
}
/**
* Test releasing of a ledger
* A ledger is released when a client decides it does not want
* to replicate it (or cannot at the moment).
* When a client releases a previously acquired ledger, another
* client should then be able to acquire it.
*/
@Test
public void testRelease() throws Exception {
String missingReplica = "localhost:3181";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
Long ledgerB = 0xdefadebL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica);
m1.markLedgerUnderreplicated(ledgerB, missingReplica);
Future<Long> fA = getLedgerToReplicate(m1);
Future<Long> fB = getLedgerToReplicate(m1);
Long lA = fA.get(5, TimeUnit.SECONDS);
Long lB = fB.get(5, TimeUnit.SECONDS);
assertTrue("Should be the ledgers I just marked",
(lA.equals(ledgerA) && lB.equals(ledgerB))
|| (lA.equals(ledgerB) && lB.equals(ledgerA)));
Future<Long> f = getLedgerToReplicate(m2);
try {
f.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// correct behaviour
}
m1.markLedgerReplicated(lA);
m1.releaseUnderreplicatedLedger(lB);
Long l = f.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I marked", lB, l);
}
/**
* Test that when a failure occurs on a ledger, while the ledger
* is already being rereplicated, the ledger will still be in the
* under replicated ledger list when first rereplicating client marks
* it as replicated.
*/
@Test
public void testManyFailures() throws Exception {
String missingReplica1 = "localhost:3181";
String missingReplica2 = "localhost:3182";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
Future<Long> fA = getLedgerToReplicate(m1);
Long lA = fA.get(5, TimeUnit.SECONDS);
m1.markLedgerUnderreplicated(ledgerA, missingReplica2);
assertEquals("Should be the ledger I just marked",
lA, ledgerA);
m1.markLedgerReplicated(lA);
Future<Long> f = getLedgerToReplicate(m1);
lA = f.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I had marked previously",
lA, ledgerA);
}
/**
* If replicationworker has acquired lock on it, then
* getReplicationWorkerIdRereplicatingLedger should return
* ReplicationWorkerId (BookieId) of the ReplicationWorker that is holding
* lock. If lock for the underreplicated ledger is not yet acquired or if it
* is released then it is supposed to return null.
*
* @throws Exception
*/
@Test
public void testGetReplicationWorkerIdRereplicatingLedger() throws Exception {
String missingReplica1 = "localhost:3181";
String missingReplica2 = "localhost:3182";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
m1.markLedgerUnderreplicated(ledgerA, missingReplica2);
// lock is not yet acquired so replicationWorkerIdRereplicatingLedger
// should
assertEquals("ReplicationWorkerId of the lock", null, m1.getReplicationWorkerIdRereplicatingLedger(ledgerA));
Future<Long> fA = getLedgerToReplicate(m1);
Long lA = fA.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger that was just marked", lA, ledgerA);
/*
* ZkLedgerUnderreplicationManager.getLockData uses
* DNS.getDefaultHost("default") as the bookieId.
*
*/
assertEquals("ReplicationWorkerId of the lock", DNS.getDefaultHost("default"),
m1.getReplicationWorkerIdRereplicatingLedger(ledgerA));
m1.markLedgerReplicated(lA);
assertEquals("ReplicationWorkerId of the lock", null, m1.getReplicationWorkerIdRereplicatingLedger(ledgerA));
}
/**
* Test that when a ledger is marked as underreplicated with
* the same missing replica twice, only marking as replicated
* will be enough to remove it from the list.
*/
@Test
public void test2reportSame() throws Exception {
String missingReplica1 = "localhost:3181";
LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
m2.markLedgerUnderreplicated(ledgerA, missingReplica1);
// verify duplicate missing replica
UnderreplicatedLedgerFormat.Builder builderA = UnderreplicatedLedgerFormat
.newBuilder();
String znode = getUrLedgerZnode(ledgerA);
byte[] data = zkc1.getData(znode, false, null);
TextFormat.merge(new String(data, Charset.forName("UTF-8")), builderA);
List<String> replicaList = builderA.getReplicaList();
assertEquals("Published duplicate missing replica : " + replicaList, 1,
replicaList.size());
assertTrue("Published duplicate missing replica : " + replicaList,
replicaList.contains(missingReplica1));
Future<Long> fA = getLedgerToReplicate(m1);
Long lA = fA.get(5, TimeUnit.SECONDS);
assertEquals("Should be the ledger I just marked",
lA, ledgerA);
m1.markLedgerReplicated(lA);
Future<Long> f = getLedgerToReplicate(m2);
try {
f.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// correct behaviour
}
}
/**
* Test that multiple LedgerUnderreplicationManagers should be able to take
* lock and release for same ledger.
*/
@Test
public void testMultipleManagersShouldBeAbleToTakeAndReleaseLock()
throws Exception {
String missingReplica1 = "localhost:3181";
final LedgerUnderreplicationManager m1 = lmf1
.newLedgerUnderreplicationManager();
final LedgerUnderreplicationManager m2 = lmf2
.newLedgerUnderreplicationManager();
Long ledgerA = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
final int iterationCount = 100;
final CountDownLatch latch1 = new CountDownLatch(iterationCount);
final CountDownLatch latch2 = new CountDownLatch(iterationCount);
Thread thread1 = new Thread() {
@Override
public void run() {
takeLedgerAndRelease(m1, latch1, iterationCount);
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
takeLedgerAndRelease(m2, latch2, iterationCount);
}
};
thread1.start();
thread2.start();
// wait until at least one thread completed
while (!latch1.await(50, TimeUnit.MILLISECONDS)
&& !latch2.await(50, TimeUnit.MILLISECONDS)) {
Thread.sleep(50);
}
m1.close();
m2.close();
// After completing 'lock acquire,release' job, it should notify below
// wait
latch1.await();
latch2.await();
}
/**
* Test verifies failures of bookies which are resembling each other.
*
* <p>BK servers named like*********************************************
* 1.cluster.com, 2.cluster.com, 11.cluster.com, 12.cluster.com
* *******************************************************************
*
* <p>BKserver IP:HOST like*********************************************
* localhost:3181, localhost:318, localhost:31812
* *******************************************************************
*/
@Test
public void testMarkSimilarMissingReplica() throws Exception {
List<String> missingReplica = new ArrayList<String>();
missingReplica.add("localhost:3181");
missingReplica.add("localhost:318");
missingReplica.add("localhost:31812");
missingReplica.add("1.cluster.com");
missingReplica.add("2.cluster.com");
missingReplica.add("11.cluster.com");
missingReplica.add("12.cluster.com");
verifyMarkLedgerUnderreplicated(missingReplica);
}
/**
* Test multiple bookie failures for a ledger and marked as underreplicated
* one after another.
*/
@Test
public void testManyFailuresInAnEnsemble() throws Exception {
List<String> missingReplica = new ArrayList<String>();
missingReplica.add("localhost:3181");
missingReplica.add("localhost:3182");
verifyMarkLedgerUnderreplicated(missingReplica);
}
/**
* Test disabling the ledger re-replication. After disabling, it will not be
* able to getLedgerToRereplicate(). This calls will enter into infinite
* waiting until enabling rereplication process
*/
@Test
public void testDisableLedegerReplication() throws Exception {
final LedgerUnderreplicationManager replicaMgr = lmf1
.newLedgerUnderreplicationManager();
// simulate few urLedgers before disabling
final Long ledgerA = 0xfeadeefdacL;
final String missingReplica = "localhost:3181";
// disabling replication
replicaMgr.disableLedgerReplication();
LOG.info("Disabled Ledeger Replication");
try {
replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
} catch (UnavailableException e) {
LOG.debug("Unexpected exception while marking urLedger", e);
fail("Unexpected exception while marking urLedger" + e.getMessage());
}
Future<Long> fA = getLedgerToReplicate(replicaMgr);
try {
fA.get(1, TimeUnit.SECONDS);
fail("Shouldn't be able to find a ledger to replicate");
} catch (TimeoutException te) {
// expected behaviour, as the replication is disabled
isLedgerReplicationDisabled = false;
}
assertTrue("Ledger replication is not disabled!",
!isLedgerReplicationDisabled);
}
/**
* Test enabling the ledger re-replication. After enableLedegerReplication,
* should continue getLedgerToRereplicate() task
*/
@Test
public void testEnableLedgerReplication() throws Exception {
isLedgerReplicationDisabled = true;
final LedgerUnderreplicationManager replicaMgr = lmf1
.newLedgerUnderreplicationManager();
// simulate few urLedgers before disabling
final Long ledgerA = 0xfeadeefdacL;
final String missingReplica = "localhost:3181";
try {
replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
} catch (UnavailableException e) {
LOG.debug("Unexpected exception while marking urLedger", e);
fail("Unexpected exception while marking urLedger" + e.getMessage());
}
// disabling replication
replicaMgr.disableLedgerReplication();
LOG.debug("Disabled Ledeger Replication");
String znodeA = getUrLedgerZnode(ledgerA);
final CountDownLatch znodeLatch = new CountDownLatch(2);
String urledgerA = StringUtils.substringAfterLast(znodeA, "/");
String urLockLedgerA = basePath + "/locks/" + urledgerA;
zkc1.exists(urLockLedgerA, new Watcher(){
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeCreated) {
znodeLatch.countDown();
LOG.debug("Recieved node creation event for the zNodePath:"
+ event.getPath());
}
}});
// getLedgerToRereplicate is waiting until enable rereplication
Thread thread1 = new Thread() {
@Override
public void run() {
try {
Long lA = replicaMgr.getLedgerToRereplicate();
assertEquals("Should be the ledger I just marked", lA,
ledgerA);
isLedgerReplicationDisabled = false;
znodeLatch.countDown();
} catch (UnavailableException e) {
LOG.debug("Unexpected exception while marking urLedger", e);
isLedgerReplicationDisabled = false;
}
}
};
thread1.start();
try {
assertFalse("shouldn't complete", znodeLatch.await(1, TimeUnit.SECONDS));
assertTrue("Ledger replication is not disabled!",
isLedgerReplicationDisabled);
assertEquals("Failed to disable ledger replication!", 2, znodeLatch
.getCount());
replicaMgr.enableLedgerReplication();
znodeLatch.await(5, TimeUnit.SECONDS);
LOG.debug("Enabled Ledeger Replication");
assertTrue("Ledger replication is not disabled!",
!isLedgerReplicationDisabled);
assertEquals("Failed to disable ledger replication!", 0, znodeLatch
.getCount());
} finally {
thread1.interrupt();
}
}
/**
* Test that the hierarchy gets cleaned up as ledgers
* are marked as fully replicated.
*/
@Test
public void testHierarchyCleanup() throws Exception {
final LedgerUnderreplicationManager replicaMgr = lmf1
.newLedgerUnderreplicationManager();
// 4 ledgers, 2 in the same hierarchy
long[] ledgers = { 0x00000000deadbeefL, 0x00000000deadbeeeL,
0x00000000beefcafeL, 0x00000000cafed00dL };
for (long l : ledgers) {
replicaMgr.markLedgerUnderreplicated(l, "localhost:3181");
}
// can't simply test top level as we are limited to ledger
// ids no larger than an int
String testPath = urLedgerPath + "/0000/0000";
List<String> children = zkc1.getChildren(testPath, false);
assertEquals("Wrong number of hierarchies", 3, children.size());
int marked = 0;
while (marked < 3) {
long l = replicaMgr.getLedgerToRereplicate();
if (l != ledgers[0]) {
replicaMgr.markLedgerReplicated(l);
marked++;
} else {
replicaMgr.releaseUnderreplicatedLedger(l);
}
}
children = zkc1.getChildren(testPath, false);
assertEquals("Wrong number of hierarchies", 1, children.size());
long l = replicaMgr.getLedgerToRereplicate();
assertEquals("Got wrong ledger", ledgers[0], l);
replicaMgr.markLedgerReplicated(l);
children = zkc1.getChildren(urLedgerPath, false);
assertEquals("All hierarchies should be cleaned up", 0, children.size());
}
/**
* Test that as the hierarchy gets cleaned up, it doesn't interfere
* with the marking of other ledgers as underreplicated.
*/
@Test
public void testHierarchyCleanupInterference() throws Exception {
final LedgerUnderreplicationManager replicaMgr1 = lmf1
.newLedgerUnderreplicationManager();
final LedgerUnderreplicationManager replicaMgr2 = lmf2
.newLedgerUnderreplicationManager();
final int iterations = 100;
final AtomicBoolean threadFailed = new AtomicBoolean(false);
Thread markUnder = new Thread() {
public void run() {
long l = 1;
try {
for (int i = 0; i < iterations; i++) {
replicaMgr1.markLedgerUnderreplicated(l, "localhost:3181");
l += 10000;
}
} catch (Exception e) {
LOG.error("markUnder Thread failed with exception", e);
threadFailed.set(true);
return;
}
}
};
final AtomicInteger processed = new AtomicInteger(0);
Thread markRepl = new Thread() {
public void run() {
try {
for (int i = 0; i < iterations; i++) {
long l = replicaMgr2.getLedgerToRereplicate();
replicaMgr2.markLedgerReplicated(l);
processed.incrementAndGet();
}
} catch (Exception e) {
LOG.error("markRepl Thread failed with exception", e);
threadFailed.set(true);
return;
}
}
};
markRepl.setDaemon(true);
markUnder.setDaemon(true);
markRepl.start();
markUnder.start();
markUnder.join();
assertFalse("Thread failed to complete", threadFailed.get());
int lastProcessed = 0;
while (true) {
markRepl.join(10000);
if (!markRepl.isAlive()) {
break;
}
assertFalse("markRepl thread not progressing", lastProcessed == processed.get());
}
assertFalse("Thread failed to complete", threadFailed.get());
List<String> children = zkc1.getChildren(urLedgerPath, false);
for (String s : children) {
LOG.info("s: {}", s);
}
assertEquals("All hierarchies should be cleaned up", 0, children.size());
}
@Test
public void testCheckAllLedgersCTime() throws Exception {
@Cleanup
LedgerUnderreplicationManager underReplicaMgr1 = lmf1.newLedgerUnderreplicationManager();
@Cleanup
LedgerUnderreplicationManager underReplicaMgr2 = lmf2.newLedgerUnderreplicationManager();
assertEquals(-1, underReplicaMgr1.getCheckAllLedgersCTime());
long curTime = System.currentTimeMillis();
underReplicaMgr2.setCheckAllLedgersCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getCheckAllLedgersCTime());
curTime = System.currentTimeMillis();
underReplicaMgr2.setCheckAllLedgersCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getCheckAllLedgersCTime());
}
@Test
public void testPlacementPolicyCheckCTime() throws Exception {
@Cleanup
LedgerUnderreplicationManager underReplicaMgr1 = lmf1.newLedgerUnderreplicationManager();
@Cleanup
LedgerUnderreplicationManager underReplicaMgr2 = lmf2.newLedgerUnderreplicationManager();
assertEquals(-1, underReplicaMgr1.getPlacementPolicyCheckCTime());
long curTime = System.currentTimeMillis();
underReplicaMgr2.setPlacementPolicyCheckCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getPlacementPolicyCheckCTime());
curTime = System.currentTimeMillis();
underReplicaMgr2.setPlacementPolicyCheckCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getPlacementPolicyCheckCTime());
}
@Test
public void testReplicasCheckCTime() throws Exception {
@Cleanup
LedgerUnderreplicationManager underReplicaMgr1 = lmf1.newLedgerUnderreplicationManager();
@Cleanup
LedgerUnderreplicationManager underReplicaMgr2 = lmf2.newLedgerUnderreplicationManager();
assertEquals(-1, underReplicaMgr1.getReplicasCheckCTime());
long curTime = System.currentTimeMillis();
underReplicaMgr2.setReplicasCheckCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getReplicasCheckCTime());
curTime = System.currentTimeMillis();
underReplicaMgr2.setReplicasCheckCTime(curTime);
assertEquals(curTime, underReplicaMgr1.getReplicasCheckCTime());
}
private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
throws KeeperException, InterruptedException, ReplicationException {
Long ledgerA = 0xfeadeefdacL;
String znodeA = getUrLedgerZnode(ledgerA);
LedgerUnderreplicationManager replicaMgr = lmf1
.newLedgerUnderreplicationManager();
for (String replica : missingReplica) {
replicaMgr.markLedgerUnderreplicated(ledgerA, replica);
}
String urLedgerA = getData(znodeA);
UnderreplicatedLedgerFormat.Builder builderA = UnderreplicatedLedgerFormat
.newBuilder();
for (String replica : missingReplica) {
builderA.addReplica(replica);
}
List<String> replicaList = builderA.getReplicaList();
for (String replica : missingReplica) {
assertTrue("UrLedger:" + urLedgerA
+ " doesn't contain failed bookie :" + replica, replicaList
.contains(replica));
}
}
private String getData(String znode) {
try {
byte[] data = zkc1.getData(znode, false, null);
return new String(data);
} catch (KeeperException e) {
LOG.error("Exception while reading data from znode :" + znode);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Exception while reading data from znode :" + znode);
}
return "";
}
private String getUrLedgerZnode(long ledgerId) {
return ZkLedgerUnderreplicationManager.getUrLedgerZnode(urLedgerPath, ledgerId);
}
private void takeLedgerAndRelease(final LedgerUnderreplicationManager m,
final CountDownLatch latch, int numberOfIterations) {
for (int i = 0; i < numberOfIterations; i++) {
try {
long ledgerToRereplicate = m.getLedgerToRereplicate();
m.releaseUnderreplicatedLedger(ledgerToRereplicate);
} catch (UnavailableException e) {
LOG.error("UnavailableException when "
+ "taking or releasing lock", e);
}
latch.countDown();
}
}
}