DL-199: Be able to support filesystem-path like name
In order to support hierarchical namespace, we need to be able to support filesystem path like log name.
Author: Sijie Guo <sijie@apache.org>
Reviewers: Jia Zhai <None>, Leigh Stewart <lstewart@apache.org>
Closes #130 from sijie/DL_199
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index 0a4608e..adb591f 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -49,7 +49,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
-import static org.apache.distributedlog.util.DLUtils.validateName;
+import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
/**
* BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses
@@ -148,7 +148,7 @@
public void createLog(String logName)
throws InvalidStreamNameException, IOException {
checkState();
- validateName(logName);
+ logName = validateAndNormalizeName(logName);
URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true));
}
@@ -157,7 +157,7 @@
public void deleteLog(String logName)
throws InvalidStreamNameException, LogNotFoundException, IOException {
checkState();
- validateName(logName);
+ logName = validateAndNormalizeName(logName);
Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
if (!uri.isPresent()) {
throw new LogNotFoundException("Log " + logName + " isn't found.");
@@ -186,7 +186,7 @@
Optional<StatsLogger> perStreamStatsLogger)
throws InvalidStreamNameException, IOException {
checkState();
- validateName(logName);
+ logName = validateAndNormalizeName(logName);
Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
if (!uri.isPresent()) {
throw new LogNotFoundException("Log " + logName + " isn't found.");
@@ -256,7 +256,7 @@
throws InvalidStreamNameException, IOException {
// Make sure the name is well formed
checkState();
- validateName(nameOfLogStream);
+ nameOfLogStream = validateAndNormalizeName(nameOfLogStream);
DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration();
mergedConfiguration.addConfiguration(conf);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index e132b64..dbe5400 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -74,7 +74,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.distributedlog.util.DLUtils.isReservedStreamName;
-import static org.apache.distributedlog.util.DLUtils.validateName;
+import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
/**
* Manager for ZooKeeper/BookKeeper based namespace
@@ -504,7 +504,7 @@
throw new UnsupportedOperationException();
}
checkState();
- validateName(streamName);
+ streamName = validateAndNormalizeName(streamName);
return new ZKMetadataAccessor(
streamName,
conf,
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java
index 7b7e0f7..7231105 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/DLUtils.java
@@ -281,41 +281,89 @@
}
/**
- * Validate the stream name.
+ * Validate the log name.
*
- * @param nameOfStream
- * name of stream
+ * @param logName
+ * name of log
* @throws InvalidStreamNameException
*/
- public static void validateName(String nameOfStream)
+ public static String validateAndNormalizeName(String logName)
throws InvalidStreamNameException {
- String reason = null;
- char chars[] = nameOfStream.toCharArray();
- char c;
- // validate the stream to see if meet zookeeper path's requirement
- for (int i = 0; i < chars.length; i++) {
- c = chars[i];
+ if (isReservedStreamName(logName)) {
+ throw new InvalidStreamNameException(logName, "Log Name is reserved");
+ }
- if (c == 0) {
- reason = "null character not allowed @" + i;
- break;
- } else if (c == '/') {
- reason = "'/' not allowed @" + i;
- break;
- } else if (c > '\u0000' && c < '\u001f'
- || c > '\u007f' && c < '\u009F'
- || c > '\ud800' && c < '\uf8ff'
- || c > '\ufff0' && c < '\uffff') {
- reason = "invalid charater @" + i;
- break;
+ if (logName.charAt(0) == '/') {
+ validatePathName(logName);
+ return logName.substring(1);
+ } else {
+ validatePathName("/" + logName);
+ return logName;
+ }
+ }
+
+ private static void validatePathName(String logName) throws InvalidStreamNameException {
+ if (logName == null) {
+ throw new InvalidStreamNameException("Log name cannot be null");
+ } else if (logName.length() == 0) {
+ throw new InvalidStreamNameException("Log name length must be > 0");
+ } else if (logName.charAt(0) != '/') {
+ throw new InvalidStreamNameException("Log name must start with / character");
+ } else if (logName.length() != 1) {
+ if (logName.charAt(logName.length() - 1) == '/') {
+ throw new InvalidStreamNameException("Log name must not end with / character");
+ } else {
+ String reason = null;
+ char lastc = '/';
+ char[] chars = logName.toCharArray();
+
+ for (int i = 1; i < chars.length; ++i) {
+ char c = chars[i];
+ if (c == 0) {
+ reason = "null character not allowed @" + i;
+ break;
+ }
+
+ if (c == '<' || c == '>') {
+ reason = "< or > specified @" + i;
+ break;
+ }
+
+ if (c == ' ') {
+ reason = "empty space specified @" + i;
+ break;
+ }
+
+ if (c == '/' && lastc == '/') {
+ reason = "empty node name specified @" + i;
+ break;
+ }
+
+ if (c == '.' && lastc == '.') {
+ if (chars[i - 2] == '/' && (i + 1 == chars.length || chars[i + 1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c == '.') {
+ if (chars[i - 1] == '/' && (i + 1 == chars.length || chars[i + 1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c > '\u0000' && c < '\u001f'
+ || c > '\u007f' && c < '\u009F'
+ || c > '\ud800' && c < '\uf8ff'
+ || c > '\ufff0' && c < '\uffff') {
+ reason = "invalid character @" + i;
+ break;
+ }
+ lastc = chars[i];
+ }
+
+ if (reason != null) {
+ throw new InvalidStreamNameException("Invalid log name \"" + logName + "\" caused by " + reason);
+ }
}
}
- if (null != reason) {
- throw new InvalidStreamNameException(nameOfStream, reason);
- }
- if (isReservedStreamName(nameOfStream)) {
- throw new InvalidStreamNameException(nameOfStream,
- "Stream Name is reserved");
- }
+
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
index 7a09eeb..347f041 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
@@ -32,6 +32,7 @@
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.function.VoidFunctions;
import org.apache.distributedlog.io.AsyncCloseable;
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
index 43d1008..e0f2bab 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
@@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,7 +51,6 @@
import org.slf4j.LoggerFactory;
-import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.*;
public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
@@ -82,6 +80,37 @@
}
@Test(timeout = 60000)
+ public void testCreateLogPath0() throws Exception {
+ createLogPathTest("/create/log/path/" + runtime.getMethodName());
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogPath1() throws Exception {
+ createLogPathTest("create/log/path/" + runtime.getMethodName());
+ }
+
+ private void createLogPathTest(String logName) throws Exception {
+ URI uri = createDLMURI("/" + runtime.getMethodName());
+ ensureURICreated(zooKeeperClient.get(), uri);
+ DistributedLogConfiguration newConf = new DistributedLogConfiguration();
+ newConf.addConfiguration(conf);
+ newConf.setCreateStreamIfNotExists(false);
+ DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ .conf(newConf).uri(uri).build();
+ DistributedLogManager dlm = namespace.openLog(logName);
+ LogWriter writer;
+ try {
+ writer = dlm.startLogSegmentNonPartitioned();
+ writer.write(DLMTestUtil.getLogRecordInstance(1L));
+ writer.flushAndSync();
+ fail("Should fail to write data if stream doesn't exist.");
+ } catch (IOException ioe) {
+ // expected
+ }
+ dlm.close();
+ }
+
+ @Test(timeout = 60000)
public void testCreateIfNotExists() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
ensureURICreated(zooKeeperClient.get(), uri);
@@ -143,8 +172,8 @@
}
try {
- namespace.openLog("/test2");
- fail("should fail to create invalid stream /test2");
+ namespace.openLog("/ test2");
+ fail("should fail to create invalid stream / test2");
} catch (InvalidStreamNameException isne) {
// expected
}
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestDLUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestDLUtils.java
index df49d49..92bb6f9 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestDLUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestDLUtils.java
@@ -21,12 +21,14 @@
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.junit.Test;
import java.util.List;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
import static org.junit.Assert.*;
/**
@@ -270,4 +272,36 @@
DLUtils.bytes2LogSegmentId(corruptedData);
}
+ @Test(timeout = 10000)
+ public void testValidateLogName() throws Exception {
+ String logName = "test-validate-log-name";
+ validateAndNormalizeName(logName);
+ }
+
+ @Test(timeout = 10000, expected = InvalidStreamNameException.class)
+ public void testValidateBadLogName0() throws Exception {
+ String logName = " test-bad-log-name";
+ validateAndNormalizeName(logName);
+ }
+
+ @Test(timeout = 10000, expected = InvalidStreamNameException.class)
+ public void testValidateBadLogName1() throws Exception {
+ String logName = "test-bad-log-name/";
+ validateAndNormalizeName(logName);
+ }
+
+ @Test(timeout = 10000, expected = InvalidStreamNameException.class)
+ public void testValidateBadLogName2() throws Exception {
+ String logName = "../test-bad-log-name/";
+ validateAndNormalizeName(logName);
+ }
+
+ @Test(timeout = 10000)
+ public void testValidateSameStreamPath() throws Exception {
+ String logName1 = "/test-resolve-log";
+ String logName2 = "test-resolve-log";
+ validateAndNormalizeName(logName1);
+ validateAndNormalizeName(logName2);
+ }
+
}