blob: ca3e332508849ad281ac3ddafdbd80a043f28455 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.DLUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
@Rule
public TestName runtime = new TestName();
static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogNamespace.class);
protected static DistributedLogConfiguration conf =
new DistributedLogConfiguration().setLockTimeout(10)
.setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
private ZooKeeperClient zooKeeperClient;
@Before
public void setup() throws Exception {
zooKeeperClient =
TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.build();
}
@After
public void teardown() throws Exception {
zooKeeperClient.close();
}
@Test(timeout = 60000)
public void testCreateLogPath0() throws Exception {
createLogPathTest("/create/log/path/" + runtime.getMethodName());
}
@Test(timeout = 60000)
public void testCreateLogPath1() throws Exception {
createLogPathTest("create/log/path/" + runtime.getMethodName());
}
private void createLogPathTest(String logName) throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
ensureURICreated(zooKeeperClient.get(), uri);
DistributedLogConfiguration newConf = new DistributedLogConfiguration();
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(false);
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(logName);
LogWriter writer;
try {
writer = dlm.startLogSegmentNonPartitioned();
writer.write(DLMTestUtil.getLogRecordInstance(1L));
writer.commit();
fail("Should fail to write data if stream doesn't exist.");
} catch (IOException ioe) {
// expected
}
dlm.close();
}
@Test(timeout = 60000)
public void testCreateIfNotExists() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
ensureURICreated(zooKeeperClient.get(), uri);
DistributedLogConfiguration newConf = new DistributedLogConfiguration();
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(false);
String streamName = "test-stream";
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(streamName);
LogWriter writer;
try {
writer = dlm.startLogSegmentNonPartitioned();
writer.write(DLMTestUtil.getLogRecordInstance(1L));
fail("Should fail to write data if stream doesn't exist.");
} catch (IOException ioe) {
// expected
}
dlm.close();
// create the stream
namespace.createLog(streamName);
DistributedLogManager newDLM = namespace.openLog(streamName);
LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
newWriter.write(DLMTestUtil.getLogRecordInstance(1L));
newWriter.close();
newDLM.close();
}
@Test(timeout = 60000)
public void testInvalidStreamName() throws Exception {
assertFalse(DLUtils.isReservedStreamName("test"));
assertTrue(DLUtils.isReservedStreamName(".test"));
URI uri = createDLMURI("/" + runtime.getMethodName());
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
try {
namespace.openLog(".test1");
fail("Should fail to create invalid stream .test");
} catch (InvalidStreamNameException isne) {
// expected
}
DistributedLogManager dlm = namespace.openLog("test1");
LogWriter writer = dlm.startLogSegmentNonPartitioned();
writer.write(DLMTestUtil.getLogRecordInstance(1));
writer.close();
dlm.close();
try {
namespace.openLog(".test2");
fail("Should fail to create invalid stream .test2");
} catch (InvalidStreamNameException isne) {
// expected
}
try {
namespace.openLog("/ test2");
fail("should fail to create invalid stream / test2");
} catch (InvalidStreamNameException isne) {
// expected
}
try {
char[] chars = new char[6];
for (int i = 0; i < chars.length; i++) {
chars[i] = 'a';
}
chars[0] = 0;
String streamName = new String(chars);
namespace.openLog(streamName);
fail("should fail to create invalid stream " + streamName);
} catch (InvalidStreamNameException isne) {
// expected
}
try {
char[] chars = new char[6];
for (int i = 0; i < chars.length; i++) {
chars[i] = 'a';
}
chars[3] = '\u0010';
String streamName = new String(chars);
namespace.openLog(streamName);
fail("should fail to create invalid stream " + streamName);
} catch (InvalidStreamNameException isne) {
// expected
}
DistributedLogManager newDLM =
namespace.openLog("test_2-3");
LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
newWriter.write(DLMTestUtil.getLogRecordInstance(1));
newWriter.close();
newDLM.close();
Iterator<String> streamIter = namespace.getLogs();
Set<String> streamSet = Sets.newHashSet(streamIter);
assertEquals(2, streamSet.size());
assertTrue(streamSet.contains("test1"));
assertTrue(streamSet.contains("test_2-3"));
namespace.close();
}
@Test(timeout = 60000)
public void testNamespaceListener() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
final CountDownLatch[] latches = new CountDownLatch[3];
for (int i = 0; i < 3; i++) {
latches[i] = new CountDownLatch(1);
}
final AtomicInteger numUpdates = new AtomicInteger(0);
final AtomicInteger numFailures = new AtomicInteger(0);
final AtomicReference<Collection<String>> receivedStreams = new AtomicReference<Collection<String>>(null);
namespace.registerNamespaceListener(new NamespaceListener() {
@Override
public void onStreamsChanged(Iterator<String> streams) {
Set<String> streamSet = Sets.newHashSet(streams);
int updates = numUpdates.incrementAndGet();
if (streamSet.size() != updates - 1) {
numFailures.incrementAndGet();
}
receivedStreams.set(streamSet);
latches[updates - 1].countDown();
}
});
latches[0].await();
namespace.createLog("test1");
latches[1].await();
namespace.createLog("test2");
latches[2].await();
assertEquals(0, numFailures.get());
assertNotNull(receivedStreams.get());
Set<String> streamSet = new HashSet<String>();
streamSet.addAll(receivedStreams.get());
assertEquals(2, receivedStreams.get().size());
assertEquals(2, streamSet.size());
assertTrue(streamSet.contains("test1"));
assertTrue(streamSet.contains("test2"));
}
private void initDlogMeta(String dlNamespace, String un, String streamName) throws Exception {
URI uri = createDLMURI(dlNamespace);
DistributedLogConfiguration newConf = new DistributedLogConfiguration();
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(true);
newConf.setZkAclId(un);
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(streamName);
LogWriter writer = dlm.startLogSegmentNonPartitioned();
for (int i = 0; i < 10; i++) {
writer.write(DLMTestUtil.getLogRecordInstance(1L));
}
writer.close();
dlm.close();
namespace.close();
}
@Test(timeout = 60000)
public void testAclPermsZkAccessConflict() throws Exception {
String namespace = "/" + runtime.getMethodName();
initDlogMeta(namespace, "test-un", "test-stream");
URI uri = createDLMURI(namespace);
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder()
.name("unpriv")
.uri(uri)
.build();
try {
zkc.get().create(uri.getPath() + "/test-stream/test-garbage",
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("write should have failed due to perms");
} catch (KeeperException.NoAuthException ex) {
LOG.info("caught exception trying to write with no perms", ex);
}
try {
zkc.get().setData(uri.getPath() + "/test-stream", new byte[0], 0);
fail("write should have failed due to perms");
} catch (KeeperException.NoAuthException ex) {
LOG.info("caught exception trying to write with no perms", ex);
}
}
@Test(timeout = 60000)
public void testAclPermsZkAccessNoConflict() throws Exception {
String namespace = "/" + runtime.getMethodName();
initDlogMeta(namespace, "test-un", "test-stream");
URI uri = createDLMURI(namespace);
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder()
.name("unpriv")
.uri(uri)
.build();
zkc.get().getChildren(uri.getPath() + "/test-stream", false, new Stat());
zkc.get().getData(uri.getPath() + "/test-stream", false, new Stat());
}
@Test(timeout = 60000)
public void testAclModifyPermsDlmConflict() throws Exception {
String streamName = "test-stream";
// Reopening and writing again with the same un will succeed.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
try {
// Reopening and writing again with a different un will fail.
initDlogMeta("/" + runtime.getMethodName(), "not-test-un", streamName);
fail("write should have failed due to perms");
} catch (ZKException ex) {
LOG.info("caught exception trying to write with no perms {}", ex);
assertEquals(KeeperException.Code.NOAUTH, ex.getKeeperExceptionCode());
} catch (Exception ex) {
LOG.info("caught wrong exception trying to write with no perms {}", ex);
fail("wrong exception " + ex.getClass().getName() + " expected " + LockingException.class.getName());
}
// Should work again.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
}
@Test(timeout = 60000)
public void testAclModifyPermsDlmNoConflict() throws Exception {
String streamName = "test-stream";
// Establish the uri.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
// Reopening and writing again with the same un will succeed.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
}
static void validateBadAllocatorConfiguration(DistributedLogConfiguration conf, URI uri) throws Exception {
try {
BKNamespaceDriver.validateAndGetFullLedgerAllocatorPoolPath(conf, uri);
fail("Should throw exception when bad allocator configuration provided");
} catch (IOException ioe) {
// expected
}
}
@Test(timeout = 60000)
public void testValidateAndGetFullLedgerAllocatorPoolPath() throws Exception {
DistributedLogConfiguration testConf = new DistributedLogConfiguration();
testConf.setEnableLedgerAllocatorPool(true);
String namespace = "/" + runtime.getMethodName();
URI uri = createDLMURI(namespace);
testConf.setLedgerAllocatorPoolName("test");
testConf.setLedgerAllocatorPoolPath("test");
validateBadAllocatorConfiguration(testConf, uri);
testConf.setLedgerAllocatorPoolPath(".");
validateBadAllocatorConfiguration(testConf, uri);
testConf.setLedgerAllocatorPoolPath("..");
validateBadAllocatorConfiguration(testConf, uri);
testConf.setLedgerAllocatorPoolPath("./");
validateBadAllocatorConfiguration(testConf, uri);
testConf.setLedgerAllocatorPoolPath(".test/");
validateBadAllocatorConfiguration(testConf, uri);
testConf.setLedgerAllocatorPoolPath(".test");
testConf.setLedgerAllocatorPoolName(null);
validateBadAllocatorConfiguration(testConf, uri);
}
@Test(timeout = 60000)
public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.build();
// before closing the namespace, no exception should be thrown
String logName = "test-stream";
// create a log
namespace.createLog(logName);
// log exists
Assert.assertTrue(namespace.logExists(logName));
// create a dlm
DistributedLogManager dlm = namespace.openLog(logName);
// do some writes
BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
for (long i = 0; i < 3; i++) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(i);
writer.write(record);
}
writer.closeAndComplete();
// do some reads
LogReader reader = dlm.getInputStream(0);
for (long i = 0; i < 3; i++) {
Assert.assertEquals(reader.readNext(false).getTransactionId(), i);
}
namespace.deleteLog(logName);
Assert.assertFalse(namespace.logExists(logName));
// now try to close the namespace
namespace.close();
try {
namespace.createLog(logName);
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
try {
namespace.openLog(logName);
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
try {
namespace.logExists(logName);
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
try {
namespace.getLogs();
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
try {
namespace.deleteLog(logName);
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
try {
namespace.createAccessControlManager();
fail("Should throw exception after namespace is closed");
} catch (AlreadyClosedException e) {
// No-ops
}
}
}