blob: 60c7d0baab14d3ac00e616f70605a4889f43fdaf [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.meta;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
/**
* Test the ledger manager iterator.
*/
public class LedgerManagerIteratorTest extends LedgerManagerTestCase {
public LedgerManagerIteratorTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
super(lmFactoryCls);
}
/**
* Remove ledger using lm syncronously.
*
* @param lm
* @param ledgerId
* @throws InterruptedException
*/
void removeLedger(LedgerManager lm, Long ledgerId) throws Exception {
lm.removeLedgerMetadata(ledgerId, Version.ANY).get();
}
/**
* Create ledger using lm syncronously.
*
* @param lm
* @param ledgerId
* @throws InterruptedException
*/
void createLedger(LedgerManager lm, Long ledgerId) throws Exception {
List<BookieSocketAddress> ensemble = Lists.newArrayList(
new BookieSocketAddress("192.0.2.1", 1234),
new BookieSocketAddress("192.0.2.2", 1234),
new BookieSocketAddress("192.0.2.3", 1234));
LedgerMetadata meta = LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withPassword("passwd".getBytes())
.withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType())
.newEnsembleEntry(0L, ensemble)
.build();
lm.createLedgerMetadata(ledgerId, meta).get();
}
static Set<Long> ledgerRangeToSet(LedgerRangeIterator lri) throws IOException {
Set<Long> ret = new TreeSet<>();
long last = -1;
while (lri.hasNext()) {
LedgerManager.LedgerRange lr = lri.next();
assertFalse("ledger range must not be empty", lr.getLedgers().isEmpty());
assertTrue("ledger ranges must not overlap", last < lr.start());
ret.addAll(lr.getLedgers());
last = lr.end();
}
return ret;
}
static Set<Long> getLedgerIdsByUsingAsyncProcessLedgers(LedgerManager lm) throws InterruptedException{
Set<Long> ledgersReadAsync = ConcurrentHashMap.newKeySet();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger finalRC = new AtomicInteger();
lm.asyncProcessLedgers((ledgerId, callback) -> {
ledgersReadAsync.add(ledgerId);
callback.processResult(BKException.Code.OK, null, null);
}, (rc, s, obj) -> {
finalRC.set(rc);
latch.countDown();
}, null, BKException.Code.OK, BKException.Code.ReadException);
latch.await();
assertEquals("Final RC of asyncProcessLedgers", BKException.Code.OK, finalRC.get());
return ledgersReadAsync;
}
@Test
public void testIterateNoLedgers() throws Exception {
LedgerManager lm = getLedgerManager();
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
if (lri.hasNext()) {
lri.next();
}
assertEquals(false, lri.hasNext());
}
@Test
public void testSingleLedger() throws Throwable {
LedgerManager lm = getLedgerManager();
long id = 2020202;
createLedger(lm, id);
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
Set<Long> lids = ledgerRangeToSet(lri);
assertEquals(lids.size(), 1);
assertEquals(lids.iterator().next().longValue(), id);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", lids, ledgersReadAsync);
}
@Test
public void testTwoLedgers() throws Throwable {
LedgerManager lm = getLedgerManager();
Set<Long> ids = new TreeSet<>(Arrays.asList(101010101L, 2020340302L));
for (Long id: ids) {
createLedger(lm, id);
}
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
}
@Test
public void testSeveralContiguousLedgers() throws Throwable {
LedgerManager lm = getLedgerManager();
Set<Long> ids = new TreeSet<>();
for (long i = 0; i < 2000; ++i) {
createLedger(lm, i);
ids.add(i);
}
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
}
@Test
public void testRemovalOfNodeJustTraversed() throws Throwable {
if (baseConf.getLedgerManagerFactoryClass()
!= LongHierarchicalLedgerManagerFactory.class) {
return;
}
LedgerManager lm = getLedgerManager();
/* For LHLM, first two should be leaves on the same node, second should be on adjacent level 4 node
* Removing all 3 once the iterator hits the first should result in the whole tree path ending
* at that node disappearing. If this happens after the iterator stops at that leaf, it should
* result in a few NodeExists errors (handled silently) as the iterator fails back up the tree
* to the next path.
*/
Set<Long> toRemove = new TreeSet<>(
Arrays.asList(
3394498498348983841L,
3394498498348983842L,
3394498498348993841L));
long first = 2345678901234567890L;
// Nodes which should be listed anyway
Set<Long> mustHave = new TreeSet<>(
Arrays.asList(
first,
6334994393848474732L));
Set<Long> ids = new TreeSet<>();
ids.addAll(toRemove);
ids.addAll(mustHave);
for (Long id: ids) {
createLedger(lm, id);
}
Set<Long> found = new TreeSet<>();
LedgerRangeIterator lri = lm.getLedgerRanges(0);
while (lri.hasNext()) {
LedgerManager.LedgerRange lr = lri.next();
found.addAll(lr.getLedgers());
if (lr.getLedgers().contains(first)) {
for (long id: toRemove) {
removeLedger(lm, id);
}
toRemove.clear();
}
}
for (long id: mustHave) {
assertTrue(found.contains(id));
}
}
@Test
public void validateEmptyL4PathSkipped() throws Throwable {
if (baseConf.getLedgerManagerFactoryClass()
!= LongHierarchicalLedgerManagerFactory.class) {
return;
}
LedgerManager lm = getLedgerManager();
Set<Long> ids = new TreeSet<>(
Arrays.asList(
2345678901234567890L,
3394498498348983841L,
6334994393848474732L,
7349370101927398483L));
for (Long id: ids) {
createLedger(lm, id);
}
String[] paths = {
"/ledgers/633/4994/3938/4948", // Empty L4 path, must be skipped
};
for (String path : paths) {
ZkUtils.createFullPathOptimistic(
zkc,
path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
lri = lm.getLedgerRanges(0);
int emptyRanges = 0;
while (lri.hasNext()) {
if (lri.next().getLedgers().isEmpty()) {
emptyRanges++;
}
}
assertEquals(0, emptyRanges);
}
@Test
public void testWithSeveralIncompletePaths() throws Throwable {
if (baseConf.getLedgerManagerFactoryClass()
!= LongHierarchicalLedgerManagerFactory.class) {
return;
}
LedgerManager lm = getLedgerManager();
Set<Long> ids = new TreeSet<>(
Arrays.asList(
2345678901234567890L,
3394498498348983841L,
6334994393848474732L,
7349370101927398483L));
for (Long id: ids) {
createLedger(lm, id);
}
String[] paths = {
"/ledgers/000/0000/0000", // top level, W-4292762
"/ledgers/234/5678/9999", // shares two path segments with the first one, comes after
"/ledgers/339/0000/0000", // shares one path segment with the second one, comes first
"/ledgers/633/4994/3938/0000", // shares three path segments with the third one, comes first
"/ledgers/922/3372/0000/0000", // close to max long, at end
};
for (String path : paths) {
ZkUtils.createFullPathOptimistic(
zkc,
path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
LedgerRangeIterator lri = lm.getLedgerRanges(0);
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
}
@Test
public void checkConcurrentModifications() throws Throwable {
final int numWriters = 10;
final int numCheckers = 10;
final int numLedgers = 100;
final long runtime = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
final boolean longRange =
baseConf.getLedgerManagerFactoryClass() == LongHierarchicalLedgerManagerFactory.class;
final Set<Long> mustExist = new TreeSet<>();
LedgerManager lm = getLedgerManager();
Random rng = new Random();
for (int i = 0; i < numLedgers; ++i) {
long lid = Math.abs(rng.nextLong());
if (!longRange) {
lid %= 1000000;
}
createLedger(lm, lid);
mustExist.add(lid);
}
final long start = MathUtils.nowInNano();
final CountDownLatch latch = new CountDownLatch(1);
ArrayList<Future<?>> futures = new ArrayList<>();
ExecutorService executor = Executors.newCachedThreadPool();
final ConcurrentSkipListSet<Long> createdLedgers = new ConcurrentSkipListSet<>();
for (int i = 0; i < numWriters; ++i) {
Future<?> f = executor.submit(() -> {
LedgerManager writerLM = getIndependentLedgerManager();
Random writerRNG = new Random(rng.nextLong());
latch.await();
while (MathUtils.elapsedNanos(start) < runtime) {
long candidate = 0;
do {
candidate = Math.abs(writerRNG.nextLong());
if (!longRange) {
candidate %= 1000000;
}
} while (mustExist.contains(candidate) || !createdLedgers.add(candidate));
createLedger(writerLM, candidate);
removeLedger(writerLM, candidate);
}
return null;
});
futures.add(f);
}
for (int i = 0; i < numCheckers; ++i) {
Future<?> f = executor.submit(() -> {
LedgerManager checkerLM = getIndependentLedgerManager();
latch.await();
while (MathUtils.elapsedNanos(start) < runtime) {
LedgerRangeIterator lri = checkerLM.getLedgerRanges(0);
Set<Long> returnedIds = ledgerRangeToSet(lri);
for (long id: mustExist) {
assertTrue(returnedIds.contains(id));
}
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(checkerLM);
for (long id: mustExist) {
assertTrue(ledgersReadAsync.contains(id));
}
}
return null;
});
futures.add(f);
}
latch.countDown();
for (Future<?> f : futures) {
f.get();
}
executor.shutdownNow();
}
@SuppressWarnings("deprecation")
@Test
public void testLedgerParentNode() throws Throwable {
/*
* this testcase applies only ZK based ledgermanager so it doesnt work
* for MSLedgerManager
*/
Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
AbstractZkLedgerManager lm = (AbstractZkLedgerManager) getLedgerManager();
List<Long> ledgerIds;
if (baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
|| baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE * 10L));
} else {
ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE - 10L));
}
for (long ledgerId : ledgerIds) {
String fullLedgerPath = lm.getLedgerPath(ledgerId);
String ledgerPath = fullLedgerPath.replaceAll(
ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/",
"");
String[] znodesOfLedger = ledgerPath.split("/");
Assert.assertTrue(znodesOfLedger[0] + " is supposed to be valid parent ",
lm.isLedgerParentNode(znodesOfLedger[0]));
}
}
@SuppressWarnings("deprecation")
@Test
public void testLedgerManagerFormat() throws Throwable {
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf);
/*
* this testcase applies only ZK based ledgermanager so it doesnt work
* for MSLedgerManager
*/
Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
AbstractZkLedgerManager lm = (AbstractZkLedgerManager) getLedgerManager();
Collection<Long> ids = Arrays.asList(1234567890L, 2L, 32345L, 23456789L);
if (baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
|| baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
ids = new ArrayList<Long>(ids);
ids.add(Integer.MAX_VALUE * 2L);
ids.add(1234567891234L);
}
for (Long id : ids) {
createLedger(lm, id);
}
// create some invalid nodes under zkLedgersRootPath
Collection<String> invalidZnodes = Arrays.asList("12345", "12345678901L", "abc", "123d");
for (String invalidZnode : invalidZnodes) {
ZkUtils.createFullPathOptimistic(zkc, zkLedgersRootPath + "/" + invalidZnode,
"data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/*
* get the count of total children under zkLedgersRootPath and also
* count of the parent nodes of ledgers under zkLedgersRootPath
*/
List<String> childrenOfLedgersRootPath = zkc.getChildren(zkLedgersRootPath, false);
int totalChildrenOfLedgersRootPath = childrenOfLedgersRootPath.size();
int totalParentNodesOfLedgers = 0;
for (String childOfLedgersRootPath : childrenOfLedgersRootPath) {
if (lm.isLedgerParentNode(childOfLedgersRootPath)) {
totalParentNodesOfLedgers++;
}
}
/*
* after ledgermanagerfactory format only the znodes of created ledgers
* under zkLedgersRootPath should be deleted recursively but not
* specialnode or invalid nodes created above
*/
ledgerManagerFactory.format(baseConf,
new ZkLayoutManager(zkc, zkLedgersRootPath, ZkUtils.getACLs(baseConf)));
List<String> childrenOfLedgersRootPathAfterFormat = zkc.getChildren(zkLedgersRootPath, false);
int totalChildrenOfLedgersRootPathAfterFormat = childrenOfLedgersRootPathAfterFormat.size();
Assert.assertEquals("totalChildrenOfLedgersRootPathAfterFormat",
totalChildrenOfLedgersRootPath - totalParentNodesOfLedgers, totalChildrenOfLedgersRootPathAfterFormat);
Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain all the invalid znodes created",
childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
}
@Test
public void hierarchicalLedgerManagerAsyncProcessLedgersTest() throws Throwable {
Assume.assumeTrue(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class));
LedgerManager lm = getLedgerManager();
LedgerRangeIterator lri = lm.getLedgerRanges(0);
Set<Long> ledgerIds = new TreeSet<>(Arrays.asList(1234L, 123456789123456789L));
for (Long ledgerId : ledgerIds) {
createLedger(lm, ledgerId);
}
Set<Long> ledgersReadThroughIterator = ledgerRangeToSet(lri);
assertEquals("Comparing LedgersIds read through Iterator", ledgerIds, ledgersReadThroughIterator);
Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
assertEquals("Comparing LedgersIds read asynchronously", ledgerIds, ledgersReadAsync);
}
}