blob: c81eb1da95adc8abe3d0b0e4653903589ef32328 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.impl.federated;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.metadata.LogMetadataStore;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test ZK based metadata store.
*/
public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase {
private final static int zkSessionTimeoutMs = 2000;
private final static int maxLogsPerSubnamespace = 10;
static class TestNamespaceListener implements NamespaceListener {
final CountDownLatch doneLatch = new CountDownLatch(1);
final AtomicReference<Iterator<String>> resultHolder = new AtomicReference<Iterator<String>>();
@Override
public void onStreamsChanged(Iterator<String> streams) {
resultHolder.set(streams);
if (streams.hasNext()) {
doneLatch.countDown();
}
}
Iterator<String> getResult() {
return resultHolder.get();
}
void waitForDone() throws InterruptedException {
doneLatch.await();
}
}
static class TestNamespaceListenerWithExpectedSize implements NamespaceListener {
final int expectedSize;
final CountDownLatch doneLatch = new CountDownLatch(1);
final AtomicReference<Set<String>> resultHolder = new AtomicReference<Set<String>>();
TestNamespaceListenerWithExpectedSize(int expectedSize) {
this.expectedSize = expectedSize;
}
Set<String> getResult() {
return resultHolder.get();
}
@Override
public void onStreamsChanged(Iterator<String> logsIter) {
List<String> logList = Lists.newArrayList(logsIter);
if (logList.size() < expectedSize) {
return;
}
resultHolder.set(Sets.newTreeSet(logList));
doneLatch.countDown();
}
void waitForDone() throws InterruptedException {
doneLatch.await();
}
}
@Rule
public TestName runtime = new TestName();
protected final DistributedLogConfiguration baseConf =
new DistributedLogConfiguration()
.setFederatedMaxLogsPerSubnamespace(maxLogsPerSubnamespace);
protected ZooKeeperClient zkc;
protected FederatedZKLogMetadataStore metadataStore;
protected OrderedScheduler scheduler;
protected URI uri;
@Before
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.sessionTimeoutMs(zkSessionTimeoutMs)
.build();
scheduler = OrderedScheduler.newBuilder()
.name("test-zk-logmetadata-store")
.corePoolSize(2)
.build();
DistributedLogConfiguration conf = new DistributedLogConfiguration();
conf.addConfiguration(baseConf);
this.uri = createDLMURI("/" + runtime.getMethodName());
FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
metadataStore = new FederatedZKLogMetadataStore(conf, uri, zkc, scheduler);
}
@After
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
}
if (null != scheduler) {
scheduler.shutdown();
}
}
private void deleteLog(String logName) throws Exception {
Optional<URI> logUriOptional = Utils.ioResult(metadataStore.getLogLocation(logName));
assertTrue(logUriOptional.isPresent());
URI logUri = logUriOptional.get();
zkc.get().delete(logUri.getPath() + "/" + logName, -1);
}
@Test(timeout = 60000)
public void testBasicOperations() throws Exception {
TestNamespaceListener listener = new TestNamespaceListener();
metadataStore.registerNamespaceListener(listener);
String logName = "test-log-1";
URI logUri = Utils.ioResult(metadataStore.createLog(logName));
assertEquals(uri, logUri);
Optional<URI> logLocation = Utils.ioResult(metadataStore.getLogLocation(logName));
assertTrue(logLocation.isPresent());
assertEquals(uri, logLocation.get());
Optional<URI> notExistLogLocation = Utils.ioResult(metadataStore.getLogLocation("non-existent-log"));
assertFalse(notExistLogLocation.isPresent());
// listener should receive notification
listener.waitForDone();
Iterator<String> logsIter = listener.getResult();
assertTrue(logsIter.hasNext());
assertEquals(logName, logsIter.next());
assertFalse(logsIter.hasNext());
// get logs should return the log
Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs());
assertTrue(newLogsIter.hasNext());
assertEquals(logName, newLogsIter.next());
assertFalse(newLogsIter.hasNext());
}
@Test(timeout = 60000)
public void testMultipleListeners() throws Exception {
TestNamespaceListener listener1 = new TestNamespaceListener();
TestNamespaceListener listener2 = new TestNamespaceListener();
metadataStore.registerNamespaceListener(listener1);
metadataStore.registerNamespaceListener(listener2);
String logName = "test-multiple-listeners";
URI logUri = Utils.ioResult(metadataStore.createLog(logName));
assertEquals(uri, logUri);
listener1.waitForDone();
listener2.waitForDone();
Iterator<String> logsIter1 = listener1.getResult();
Iterator<String> logsIter2 = listener2.getResult();
assertTrue(Iterators.elementsEqual(logsIter1, logsIter2));
}
@Test(timeout = 60000)
public void testCreateLog() throws Exception {
DistributedLogConfiguration conf = new DistributedLogConfiguration();
conf.addConfiguration(baseConf);
ZooKeeperClient anotherZkc = TestZooKeeperClientBuilder.newBuilder()
.uri(uri)
.sessionTimeoutMs(zkSessionTimeoutMs)
.build();
FederatedZKLogMetadataStore anotherMetadataStore =
new FederatedZKLogMetadataStore(conf, uri, anotherZkc, scheduler);
for (int i = 0; i < 2 * maxLogsPerSubnamespace; i++) {
LogMetadataStore createStore, checkStore;
if (i % 2 == 0) {
createStore = metadataStore;
checkStore = anotherMetadataStore;
} else {
createStore = anotherMetadataStore;
checkStore = metadataStore;
}
String logName = "test-create-log-" + i;
URI logUri = Utils.ioResult(createStore.createLog(logName));
Optional<URI> logLocation = Utils.ioResult(checkStore.getLogLocation(logName));
assertTrue("Log " + logName + " doesn't exist", logLocation.isPresent());
assertEquals("Different log location " + logLocation.get() + " is found",
logUri, logLocation.get());
}
assertEquals(2, metadataStore.getSubnamespaces().size());
assertEquals(2, anotherMetadataStore.getSubnamespaces().size());
}
@Test(timeout = 60000)
public void testDuplicatedLogs() throws Exception {
DistributedLogConfiguration conf = new DistributedLogConfiguration();
conf.addConfiguration(baseConf);
String logName = "test-log";
Utils.ioResult(metadataStore.createLog(logName));
URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace());
URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace());
String duplicatedLogName = "test-duplicated-logs";
// Create same log in different sub namespaces
metadataStore.createLogInNamespaceSync(subNs1, duplicatedLogName);
metadataStore.createLogInNamespaceSync(subNs2, duplicatedLogName);
try {
Utils.ioResult(metadataStore.createLog("non-existent-log"));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
Utils.ioResult(metadataStore.getLogLocation(logName));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
Utils.ioResult(metadataStore.getLogLocation("non-existent-log"));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
Utils.ioResult(metadataStore.getLogLocation(duplicatedLogName));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
Utils.ioResult(metadataStore.getLogs());
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
assertTrue(metadataStore.duplicatedLogFound.get());
}
}
@Test(timeout = 60000)
public void testGetLogLocationWhenCacheMissed() throws Exception {
String logName = "test-get-location-when-cache-missed";
URI logUri = Utils.ioResult(metadataStore.createLog(logName));
assertEquals(uri, logUri);
metadataStore.removeLogFromCache(logName);
Optional<URI> logLocation = Utils.ioResult(metadataStore.getLogLocation(logName));
assertTrue(logLocation.isPresent());
assertEquals(logUri, logLocation.get());
}
@Test(timeout = 60000, expected = LogExistsException.class)
public void testCreateLogWhenCacheMissed() throws Exception {
String logName = "test-create-log-when-cache-missed";
URI logUri = Utils.ioResult(metadataStore.createLog(logName));
assertEquals(uri, logUri);
metadataStore.removeLogFromCache(logName);
Utils.ioResult(metadataStore.createLog(logName));
}
@Test(timeout = 60000, expected = LogExistsException.class)
public void testCreateLogWhenLogExists() throws Exception {
String logName = "test-create-log-when-log-exists";
URI logUri = Utils.ioResult(metadataStore.createLog(logName));
assertEquals(uri, logUri);
Utils.ioResult(metadataStore.createLog(logName));
}
private Set<String> createLogs(int numLogs, String prefix) throws Exception {
Set<String> expectedLogs = Sets.newTreeSet();
for (int i = 0; i < numLogs; i++) {
String logName = prefix + i;
Utils.ioResult(metadataStore.createLog(logName));
expectedLogs.add(logName);
}
return expectedLogs;
}
private Set<String> createLogs(URI uri, int numLogs, String prefix) throws Exception {
Set<String> expectedLogs = Sets.newTreeSet();
for (int i = 0; i < numLogs; i++) {
String logName = prefix + i;
metadataStore.createLogInNamespaceSync(uri, logName);
expectedLogs.add(logName);
}
return expectedLogs;
}
@Test(timeout = 60000)
public void testGetLogs() throws Exception {
int numLogs = 3 * maxLogsPerSubnamespace;
Set<String> expectedLogs = createLogs(numLogs, "test-get-logs");
Set<String> receivedLogs;
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < numLogs);
assertEquals(numLogs, receivedLogs.size());
assertTrue(Sets.difference(expectedLogs, receivedLogs).isEmpty());
}
@Test(timeout = 60000)
public void testNamespaceListener() throws Exception {
int numLogs = 3 * maxLogsPerSubnamespace;
TestNamespaceListenerWithExpectedSize listener = new TestNamespaceListenerWithExpectedSize(numLogs);
metadataStore.registerNamespaceListener(listener);
Set<String> expectedLogs = createLogs(numLogs, "test-namespace-listener");
listener.waitForDone();
Set<String> receivedLogs = listener.getResult();
assertEquals(numLogs, receivedLogs.size());
assertTrue(Sets.difference(expectedLogs, receivedLogs).isEmpty());
Random r = new Random(System.currentTimeMillis());
int logId = r.nextInt(numLogs);
String logName = "test-namespace-listener" + logId;
TestNamespaceListener deleteListener = new TestNamespaceListener();
metadataStore.registerNamespaceListener(deleteListener);
deleteLog(logName);
deleteListener.waitForDone();
Set<String> logsAfterDeleted = Sets.newTreeSet(Lists.newArrayList(deleteListener.getResult()));
assertEquals(numLogs - 1, logsAfterDeleted.size());
expectedLogs.remove(logName);
assertTrue(Sets.difference(expectedLogs, receivedLogs).isEmpty());
}
@Test(timeout = 60000)
public void testCreateLogPickingFirstAvailableSubNamespace() throws Exception {
URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace());
URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace());
Set<String> logs0 = createLogs(uri, maxLogsPerSubnamespace - 1, "test-ns0-");
Set<String> logs1 = createLogs(subNs1, maxLogsPerSubnamespace, "test-ns1-");
Set<String> logs2 = createLogs(subNs2, maxLogsPerSubnamespace, "test-ns2-");
Set<String> allLogs = Sets.newTreeSet();
allLogs.addAll(logs0);
allLogs.addAll(logs1);
allLogs.addAll(logs2);
// make sure the metadata store saw all 29 logs
Set<String> receivedLogs;
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1);
TestNamespaceListenerWithExpectedSize listener =
new TestNamespaceListenerWithExpectedSize(3 * maxLogsPerSubnamespace + 1);
metadataStore.registerNamespaceListener(listener);
Set<URI> uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
assertEquals(3, uris.size());
String testLogName = "test-pick-first-available-ns";
URI createdURI = Utils.ioResult(metadataStore.createLog(testLogName));
allLogs.add(testLogName);
assertEquals(uri, createdURI);
uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
assertEquals(3, uris.size());
testLogName = "test-create-new-ns";
URI newURI = Utils.ioResult(metadataStore.createLog(testLogName));
allLogs.add(testLogName);
assertFalse(uris.contains(newURI));
uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
assertEquals(4, uris.size());
listener.waitForDone();
receivedLogs = listener.getResult();
assertEquals(3 * maxLogsPerSubnamespace + 1, receivedLogs.size());
assertEquals(allLogs, receivedLogs);
}
@Test(timeout = 60000)
public void testZooKeeperSessionExpired() throws Exception {
Set<String> allLogs = createLogs(2 * maxLogsPerSubnamespace, "test-zookeeper-session-expired-");
TestNamespaceListenerWithExpectedSize listener =
new TestNamespaceListenerWithExpectedSize(2 * maxLogsPerSubnamespace + 1);
metadataStore.registerNamespaceListener(listener);
ZooKeeperClientUtils.expireSession(zkc, BKNamespaceDriver.getZKServersFromDLUri(uri), zkSessionTimeoutMs);
String testLogName = "test-log-name";
allLogs.add(testLogName);
DistributedLogConfiguration anotherConf = new DistributedLogConfiguration();
anotherConf.addConfiguration(baseConf);
ZooKeeperClient anotherZkc = TestZooKeeperClientBuilder.newBuilder()
.uri(uri)
.sessionTimeoutMs(zkSessionTimeoutMs)
.build();
FederatedZKLogMetadataStore anotherMetadataStore =
new FederatedZKLogMetadataStore(anotherConf, uri, anotherZkc, scheduler);
Utils.ioResult(anotherMetadataStore.createLog(testLogName));
listener.waitForDone();
Set<String> receivedLogs = listener.getResult();
assertEquals(2 * maxLogsPerSubnamespace + 1, receivedLogs.size());
assertEquals(allLogs, receivedLogs);
}
}