Add TTL and Container Modes to ZkClient and BaseDataAccessor APIs (#2138)
Add TTL and Container Modes to ZkClient and BaseDataAccessor APIs
diff --git a/helix-core/helix-core-1.0.5-SNAPSHOT.ivy b/helix-core/helix-core-1.0.5-SNAPSHOT.ivy
index 630ab43..0a3ff82 100755
--- a/helix-core/helix-core-1.0.5-SNAPSHOT.ivy
+++ b/helix-core/helix-core-1.0.5-SNAPSHOT.ivy
@@ -52,7 +52,7 @@
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
- <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+ <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="commons-io" name="commons-io" rev="2.11.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index e556ccf..9c639d8 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -44,6 +44,18 @@
boolean create(String path, T record, int options);
/**
+ * This will always attempt to create the znode, if it exists it will return false. Will
+ * create parents if they do not exist. For performance reasons, it may try to create
+ * child first and only if it fails it will try to create parents
+ * @param path path to the ZNode to create
+ * @param record the data to write to the ZNode
+ * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+ * @param ttl TTL of the node in milliseconds, if options supports it
+ * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists)
+ */
+ boolean create(String path, T record, int options, long ttl);
+
+ /**
* This will always attempt to set the data on existing node. If the ZNode does not
* exist it will create it and all its parents ZNodes if necessary
* @param path path to the ZNode to set
@@ -96,6 +108,17 @@
boolean[] createChildren(List<String> paths, List<T> records, int options);
/**
+ * Use it when creating children under a parent node. This will use async api for better
+ * performance. If the child already exists it will return false.
+ * @param paths the paths to the children ZNodes
+ * @param records List of data to write to each of the path
+ * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+ * @param ttl TTL of the node in milliseconds, if options supports it
+ * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists)
+ */
+ boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl);
+
+ /**
* can set multiple children under a parent node. This will use async api for better
* performance. If this child does not exist it will create it.
* @param paths the paths to the children ZNodes
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 4e40d41..0c7a94f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -47,6 +47,7 @@
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -249,7 +250,15 @@
*/
@Override
public boolean create(String path, T record, int options) {
- AccessResult result = doCreate(path, record, options);
+ return create(path, record, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * sync create with TTL
+ */
+ @Override
+ public boolean create(String path, T record, int options, long ttl) {
+ AccessResult result = doCreate(path, record, options, ttl);
return result._retCode == RetCode.OK;
}
@@ -257,6 +266,13 @@
* sync create
*/
public AccessResult doCreate(String path, T record, int options) {
+ return doCreate(path, record, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * sync create with TTL
+ */
+ public AccessResult doCreate(String path, T record, int options, long ttl) {
AccessResult result = new AccessResult();
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
@@ -269,7 +285,7 @@
do {
retry = false;
try {
- _zkClient.create(path, record, mode);
+ _zkClient.create(path, record, mode, ttl);
result._pathCreated.add(path);
result._retCode = RetCode.OK;
@@ -278,7 +294,14 @@
// this will happen if parent node does not exist
String parentPath = HelixUtil.getZkParentPath(path);
try {
- AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+ AccessResult res;
+ if (mode.isTTL()) {
+ res = doCreate(parentPath, null, options, ttl);
+ } else if (mode.isContainer()) {
+ res = doCreate(parentPath, null, AccessOption.CONTAINER);
+ } else {
+ res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+ }
result._pathCreated.addAll(res._pathCreated);
RetCode rc = res._retCode;
if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) {
@@ -720,6 +743,14 @@
*/
ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
boolean[] needCreate, List<List<String>> pathsCreated, int options) {
+ return create(paths, records, needCreate, pathsCreated, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * async create with TTL. give up on error other than NONODE
+ */
+ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
+ boolean[] needCreate, List<List<String>> pathsCreated, int options, long ttl) {
if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
|| (pathsCreated != null && pathsCreated.size() != paths.size())) {
throw new IllegalArgumentException(
@@ -747,7 +778,11 @@
String path = paths.get(i);
T record = records == null ? null : records.get(i);
cbList[i] = new ZkAsyncCallbacks.CreateCallbackHandler();
- _zkClient.asyncCreate(path, record, mode, cbList[i]);
+ if (mode.isTTL()) {
+ _zkClient.asyncCreate(path, record, mode, ttl, cbList[i]);
+ } else {
+ _zkClient.asyncCreate(path, record, mode, cbList[i]);
+ }
}
List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
@@ -784,8 +819,16 @@
if (failOnNoNode) {
boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
- ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList =
- create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+ ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList;
+ if (mode.isTTL()) {
+ parentCbList = create(parentPaths, null, needCreateParent, pathsCreated, options, ttl);
+ } else if (mode.isContainer()) {
+ parentCbList =
+ create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.CONTAINER);
+ } else {
+ parentCbList =
+ create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+ }
for (int i = 0; i < parentCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
if (parentCb == null) {
@@ -812,6 +855,15 @@
*/
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+ return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * async create with TTL
+ * TODO: rename to create
+ */
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
boolean[] success = new boolean[paths.size()];
CreateMode mode = AccessOption.getMode(options);
@@ -829,7 +881,7 @@
try {
ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
- create(paths, records, needCreate, pathsCreated, options);
+ create(paths, records, needCreate, pathsCreated, options, ttl);
for (int i = 0; i < cbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index e165199..6a635a5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,6 +38,7 @@
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -225,6 +226,11 @@
@Override
public boolean create(String path, T data, int options) {
+ return create(path, data, options, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public boolean create(String path, T data, int options, long ttl) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
@@ -233,7 +239,7 @@
try {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
- _baseAccessor.doCreate(serverPath, data, options);
+ _baseAccessor.doCreate(serverPath, data, options, ttl);
boolean success = (result._retCode == RetCode.OK);
updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
@@ -245,7 +251,7 @@
}
// no cache
- return _baseAccessor.create(serverPath, data, options);
+ return _baseAccessor.create(serverPath, data, options, ttl);
}
@Override
@@ -426,6 +432,11 @@
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+ return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
@@ -438,7 +449,7 @@
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList =
- _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
+ _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options, ttl);
boolean[] success = new boolean[size];
for (int i = 0; i < size; i++) {
@@ -456,7 +467,7 @@
}
// no cache
- return _baseAccessor.createChildren(serverPaths, records, options);
+ return _baseAccessor.createChildren(serverPaths, records, options, ttl);
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index f473be8..0ce99d5 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -211,6 +211,69 @@
}
@Test
+ public void testSyncCreateWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+ ZNRecord record = new ZNRecord("msg_0");
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+ boolean success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL);
+ Assert.assertFalse(success);
+ long ttl = 1L;
+ success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+ Assert.assertTrue(success);
+ ZNRecord getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getId(), "msg_0");
+
+ record.setSimpleField("key0", "value0");
+ success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+ Assert.assertFalse(success, "Should fail since node already exists");
+ getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSyncCreateContainer() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+ ZNRecord record = new ZNRecord("msg_0");
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+ boolean success = accessor.create(path, record, AccessOption.CONTAINER);
+ Assert.assertTrue(success);
+ ZNRecord getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getId(), "msg_0");
+
+ record.setSimpleField("key0", "value0");
+ success = accessor.create(path, record, AccessOption.CONTAINER);
+ Assert.assertFalse(success, "Should fail since node already exists");
+ getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
public void testDefaultAccessorCreateCustomData() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
@@ -513,6 +576,52 @@
Assert.assertEquals(record.getId(), msgId, "Should get what we created");
}
+ // test async createChildren with TTL
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ records = new ArrayList<>();
+ paths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathBuilder.instanceMessage(root, "host_2", msgId));
+ records.add(new ZNRecord(msgId));
+ }
+ success = accessor.createChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, 1L);
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ String path = PropertyPathBuilder.instanceMessage(root, "host_2", msgId);
+ ZNRecord record = _gZkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+
+ // test async createChildren with Container mode
+ records = new ArrayList<>();
+ paths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathBuilder.instanceMessage(root, "host_3", msgId));
+ records.add(new ZNRecord(msgId));
+ }
+ success = accessor.createChildren(paths, records, AccessOption.CONTAINER);
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ String path = PropertyPathBuilder.instanceMessage(root, "host_3", msgId);
+ ZNRecord record = _gZkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+
// test async setChildren
records = new ArrayList<>();
paths = new ArrayList<>();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index e22fcc2..1567b98 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -68,6 +68,11 @@
}
@Override
+ public boolean create(String path, ZNRecord record, int options, long ttl) {
+ return set(path, record, options);
+ }
+
+ @Override
public boolean set(String path, ZNRecord record, int options) {
ZNode zNode = _recordMap.get(path);
if (zNode == null) {
@@ -113,6 +118,12 @@
}
@Override
+ public boolean[] createChildren(List<String> paths, List<ZNRecord> records,
+ int options, long ttl) {
+ return setChildren(paths, records, options);
+ }
+
+ @Override
public boolean[] setChildren(List<String> paths, List<ZNRecord> records, int options) {
boolean [] ret = new boolean[paths.size()];
for (int i = 0; i < paths.size(); i++) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index d006420..99874c7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -175,10 +175,34 @@
void createPersistent(String path, Object data, List<ACL> acl);
+ void createPersistentWithTTL(String path, long ttl);
+
+ void createPersistentWithTTL(String path, boolean createParents, long ttl);
+
+ void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl);
+
+ void createPersistentWithTTL(String path, Object data, long ttl);
+
+ void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl);
+
String createPersistentSequential(String path, Object data);
String createPersistentSequential(String path, Object data, List<ACL> acl);
+ String createPersistentSequentialWithTTL(String path, Object data, long ttl);
+
+ String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl);
+
+ void createContainer(String path);
+
+ void createContainer(String path, boolean createParents);
+
+ void createContainer(String path, boolean createParents, List<ACL> acl);
+
+ void createContainer(String path, Object data);
+
+ void createContainer(String path, Object data, List<ACL> acl);
+
void createEphemeral(final String path);
void createEphemeral(final String path, final String sessionId);
@@ -189,8 +213,13 @@
String create(final String path, Object data, final CreateMode mode);
+ String create(final String path, Object data, final CreateMode mode, long ttl);
+
String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode);
+ String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode,
+ long ttl);
+
void createEphemeral(final String path, final Object data);
void createEphemeral(final String path, final Object data, final String sessionId);
@@ -246,6 +275,9 @@
void asyncCreate(final String path, Object datat, final CreateMode mode,
final ZkAsyncCallbacks.CreateCallbackHandler cb);
+ void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl,
+ final ZkAsyncCallbacks.CreateCallbackHandler cb);
+
void asyncSetData(final String path, Object datat, final int version,
final ZkAsyncCallbacks.SetDataCallbackHandler cb);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index 74ff0ad..738a070 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -182,6 +182,32 @@
}
@Override
+ public void createPersistentWithTTL(String path, long ttl) {
+ createPersistentWithTTL(path, false, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, long ttl) {
+ createPersistentWithTTL(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl) {
+ checkIfPathContainsShardingKey(path);
+ _rawZkClient.createPersistentWithTTL(path, createParents, acl, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, long ttl) {
+ create(path, data, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ create(path, data, acl, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ @Override
public String createPersistentSequential(String path, Object data) {
return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
}
@@ -192,6 +218,42 @@
}
@Override
+ public String createPersistentSequentialWithTTL(String path, Object data, long ttl) {
+ return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ @Override
+ public String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ @Override
+ public void createContainer(String path) {
+ createContainer(path, false);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents) {
+ createContainer(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents, List<ACL> acl) {
+ checkIfPathContainsShardingKey(path);
+ _rawZkClient.createContainer(path, createParents, acl);
+ }
+
+ @Override
+ public void createContainer(String path, Object data) {
+ create(path, data, CreateMode.CONTAINER);
+ }
+
+ @Override
+ public void createContainer(String path, Object data, List<ACL> acl) {
+ create(path, data, acl, CreateMode.CONTAINER);
+ }
+
+ @Override
public void createEphemeral(String path) {
create(path, null, CreateMode.EPHEMERAL);
}
@@ -218,12 +280,23 @@
}
@Override
+ public String create(String path, Object data, CreateMode mode, long ttl) {
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, ttl);
+ }
+
+ @Override
public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
checkIfPathContainsShardingKey(path);
return _rawZkClient.create(path, datat, acl, mode);
}
@Override
+ public String create(String path, Object datat, List<ACL> acl, CreateMode mode, long ttl) {
+ checkIfPathContainsShardingKey(path);
+ return _rawZkClient.create(path, datat, acl, mode, ttl);
+ }
+
+ @Override
public void createEphemeral(String path, Object data) {
create(path, data, CreateMode.EPHEMERAL);
}
@@ -367,6 +440,13 @@
}
@Override
+ public void asyncCreate(String path, Object datat, CreateMode mode, long ttl,
+ ZkAsyncCallbacks.CreateCallbackHandler cb) {
+ checkIfPathContainsShardingKey(path);
+ _rawZkClient.asyncCreate(path, datat, mode, ttl, cb);
+ }
+
+ @Override
public void asyncCreate(String path, Object datat, CreateMode mode,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
checkIfPathContainsShardingKey(path);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index dc55d53..51a22c7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -192,6 +192,31 @@
}
@Override
+ public void createPersistentWithTTL(String path, long ttl) {
+ createPersistentWithTTL(path, false, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, long ttl) {
+ createPersistentWithTTL(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl) {
+ getZkClient(path).createPersistentWithTTL(path, createParents, acl, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, long ttl) {
+ create(path, data, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ create(path, data, acl, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ @Override
public String createPersistentSequential(String path, Object data) {
return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
}
@@ -202,6 +227,41 @@
}
@Override
+ public String createPersistentSequentialWithTTL(String path, Object data, long ttl) {
+ return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ @Override
+ public String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ @Override
+ public void createContainer(String path) {
+ createContainer(path, false);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents) {
+ createContainer(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents, List<ACL> acl) {
+ getZkClient(path).createContainer(path, createParents, acl);
+ }
+
+ @Override
+ public void createContainer(String path, Object data) {
+ create(path, data, CreateMode.CONTAINER);
+ }
+
+ @Override
+ public void createContainer(String path, Object data, List<ACL> acl) {
+ create(path, data, acl, CreateMode.CONTAINER);
+ }
+
+ @Override
public void createEphemeral(String path) {
create(path, null, CreateMode.EPHEMERAL);
}
@@ -227,11 +287,21 @@
}
@Override
+ public String create(String path, Object data, CreateMode mode, long ttl) {
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, ttl);
+ }
+
+ @Override
public String create(String path, Object data, List<ACL> acl, CreateMode mode) {
return create(path, data, acl, mode, null);
}
@Override
+ public String create(String path, Object data, List<ACL> acl, CreateMode mode, long ttl) {
+ return create(path, data, acl, mode, ttl, null);
+ }
+
+ @Override
public void createEphemeral(String path, Object data) {
create(path, data, CreateMode.EPHEMERAL);
}
@@ -360,6 +430,12 @@
}
@Override
+ public void asyncCreate(String path, Object data, CreateMode mode, long ttl,
+ ZkAsyncCallbacks.CreateCallbackHandler cb) {
+ getZkClient(path).asyncCreate(path, data, mode, ttl, cb);
+ }
+
+ @Override
public void asyncCreate(String path, Object data, CreateMode mode,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
getZkClient(path).asyncCreate(path, data, mode, cb);
@@ -501,13 +577,18 @@
private String create(final String path, final Object dataObject, final List<ACL> acl,
final CreateMode mode, final String expectedSessionId) {
+ return create(path, dataObject, acl, mode, ZkClient.TTL_NOT_SET, expectedSessionId);
+ }
+
+ private String create(final String path, final Object dataObject, final List<ACL> acl,
+ final CreateMode mode, final long ttl, final String expectedSessionId) {
if (mode.isEphemeral()) {
throwUnsupportedOperationException();
}
// Create mode is not session-aware, so the node does not have to be created
// by the expectedSessionId.
- return getZkClient(path).create(path, dataObject, acl, mode);
+ return getZkClient(path).create(path, dataObject, acl, mode, ttl);
}
private ZkClient getZkClient(String path) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 7f529d2..093538f 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -39,6 +39,7 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -190,6 +191,33 @@
}
@Override
+ public void createPersistentWithTTL(String path, long ttl) {
+ createPersistentWithTTL(path, false, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, long ttl) {
+ createPersistentWithTTL(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.createPersistentWithTTL(path, createParents, acl, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, long ttl) {
+ createPersistentWithTTL(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ @Override
+ public void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.createPersistentWithTTL(path, data, acl, ttl);
+ }
+
+ @Override
public String createPersistentSequential(String path, Object data) {
checkIfPathContainsShardingKey(path);
return _innerSharedZkClient.createPersistentSequential(path, data);
@@ -202,6 +230,44 @@
}
@Override
+ public String createPersistentSequentialWithTTL(String path, Object data, long ttl) {
+ return createPersistentSequentialWithTTL(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ @Override
+ public String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ checkIfPathContainsShardingKey(path);
+ return _innerSharedZkClient.createPersistentSequentialWithTTL(path, data, acl, ttl);
+ }
+
+ @Override
+ public void createContainer(String path) {
+ createContainer(path, false);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents) {
+ createContainer(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public void createContainer(String path, boolean createParents, List<ACL> acl) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.createContainer(path, createParents, acl);
+ }
+
+ @Override
+ public void createContainer(String path, Object data) {
+ createContainer(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public void createContainer(String path, Object data, List<ACL> acl) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.createContainer(path, data, acl);
+ }
+
+ @Override
public void createEphemeral(String path) {
throw new UnsupportedOperationException(
"Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
@@ -231,16 +297,26 @@
@Override
public String create(String path, Object data, CreateMode mode) {
+ return create(path, data, mode, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public String create(String path, Object data, CreateMode mode, long ttl) {
checkIfPathContainsShardingKey(path);
// delegate to _innerSharedZkClient is fine as _innerSharedZkClient would not allow creating ephemeral node.
// this still keeps the same behavior.
- return _innerSharedZkClient.create(path, data, mode);
+ return _innerSharedZkClient.create(path, data, mode, ttl);
}
@Override
public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
+ return create(path, datat, acl, mode, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public String create(String path, Object datat, List<ACL> acl, CreateMode mode, long ttl) {
checkIfPathContainsShardingKey(path);
- return _innerSharedZkClient.create(path, datat, acl, mode);
+ return _innerSharedZkClient.create(path, datat, acl, mode, ttl);
}
@Override
@@ -403,10 +479,16 @@
}
@Override
- public void asyncCreate(String path, Object datat, CreateMode mode,
+ public void asyncCreate(String path, Object datat, CreateMode mode, long ttl,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
checkIfPathContainsShardingKey(path);
- _innerSharedZkClient.asyncCreate(path, datat, mode, cb);
+ _innerSharedZkClient.asyncCreate(path, datat, mode, ttl, cb);
+ }
+
+ @Override
+ public void asyncCreate(String path, Object datat, CreateMode mode,
+ ZkAsyncCallbacks.CreateCallbackHandler cb) {
+ asyncCreate(path, datat, mode, ZkClient.TTL_NOT_SET, cb);
}
@Override
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
index e3cbf5d..48d1bde 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
@@ -194,12 +194,17 @@
@Override
public String create(final String path, Object datat, final List<ACL> acl,
final CreateMode mode) {
+ return create(path, datat, acl, mode, TTL_NOT_SET);
+ }
+ @Override
+ public String create(final String path, Object datat, final List<ACL> acl,
+ final CreateMode mode, long ttl) {
if (mode.isEphemeral()) {
throw new UnsupportedOperationException(
"Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+ " is not supported.");
}
- return super.create(path, datat, acl, mode);
+ return super.create(path, datat, acl, mode, ttl);
}
@Override
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
index 6fc040f..e766bf7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
@@ -40,6 +40,8 @@
public String create(String path, byte[] data, List<ACL> acl, CreateMode mode) throws KeeperException, InterruptedException;
+ public String create(String path, byte[] data, List<ACL> acl, CreateMode mode, long ttl) throws KeeperException, InterruptedException;
+
public void delete(String path) throws InterruptedException, KeeperException;
boolean exists(final String path, final boolean watch) throws KeeperException, InterruptedException;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index be72920..e483265 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -85,6 +85,7 @@
public class ZkClient implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+ public static final long TTL_NOT_SET = -1L;
private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
// If number of children exceeds this limit, getChildren() should not retry on connection loss.
@@ -438,6 +439,41 @@
}
/**
+ * Create a persistent node with TTL.
+ * @param path the path where you want the node to be created
+ * @param ttl TTL of the node in milliseconds
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createPersistentWithTTL(String path, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ createPersistentWithTTL(path, false, ttl);
+ }
+
+ /**
+ * Create a container node.
+ * @param path the path where you want the node to be created
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createContainer(String path)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ createContainer(path, false);
+ }
+
+ /**
* Create a persistent node and set its ACLs.
* @param path
* @param createParents
@@ -459,6 +495,45 @@
}
/**
+ * Create a persistent node with TTL and set its ACLs.
+ * @param path the path where you want the node to be created
+ * @param createParents if true all parent dirs are created as well and no
+ * {@link ZkNodeExistsException} is thrown in case the path already exists
+ * @param ttl TTL of the node in milliseconds
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createPersistentWithTTL(String path, boolean createParents, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ createPersistentWithTTL(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl);
+ }
+
+ /**
+ * Create a container node and set its ACLs.
+ * @param path the path where you want the node to be created
+ * @param createParents if true all parent dirs are created as well and no
+ * {@link ZkNodeExistsException} is thrown in case the path already exists
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createContainer(String path, boolean createParents)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ createContainer(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ /**
* Create a persistent node and set its ACLs.
* @param path
* @param acl
@@ -495,6 +570,73 @@
}
/**
+ * Create a persistent node with TTL and set its ACLs.
+ * @param path the path where you want the node to be created
+ * @param createParents if true all parent dirs are created as well and no
+ * {@link ZkNodeExistsException} is thrown in case the path already exists
+ * @param acl List of ACL permissions to assign to the node
+ * @param ttl TTL of the node in milliseconds
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ try {
+ create(path, null, acl, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ } catch (ZkNodeExistsException e) {
+ if (!createParents) {
+ throw e;
+ }
+ } catch (ZkNoNodeException e) {
+ if (!createParents) {
+ throw e;
+ }
+ String parentDir = path.substring(0, path.lastIndexOf('/'));
+ createPersistentWithTTL(parentDir, createParents, acl, ttl);
+ createPersistentWithTTL(path, createParents, acl, ttl);
+ }
+ }
+
+ /**
+ * Create a container node and set its ACLs.
+ * @param path the path where you want the node to be created
+ * @param createParents if true all parent dirs are created as well and no
+ * {@link ZkNodeExistsException} is thrown in case the path already exists
+ * @param acl List of ACL permissions to assign to the node
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createContainer(String path, boolean createParents, List<ACL> acl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ try {
+ create(path, null, acl, CreateMode.CONTAINER);
+ } catch (ZkNodeExistsException e) {
+ if (!createParents) {
+ throw e;
+ }
+ } catch (ZkNoNodeException e) {
+ if (!createParents) {
+ throw e;
+ }
+ String parentDir = path.substring(0, path.lastIndexOf('/'));
+ createContainer(parentDir, createParents, acl);
+ createContainer(path, createParents, acl);
+ }
+ }
+
+ /**
* Create a persistent node.
* @param path
* @param data
@@ -513,6 +655,43 @@
}
/**
+ * Create a persistent node with TTL.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @param ttl TTL of the node in milliseconds
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createPersistentWithTTL(String path, Object data, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ create(path, data, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ /**
+ * Create a container node.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createContainer(String path, Object data)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ create(path, data, CreateMode.CONTAINER);
+ }
+
+ /**
* Create a persistent node.
* @param path
* @param data
@@ -531,6 +710,43 @@
}
/**
+ * Create a persistent node with TTL.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @param acl list of ACL for the node
+ * @param ttl TTL of the node in milliseconds
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl) {
+ create(path, data, acl, CreateMode.PERSISTENT_WITH_TTL, ttl);
+ }
+
+ /**
+ * Create a container node.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @param acl list of ACL for the node
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public void createContainer(String path, Object data, List<ACL> acl) {
+ create(path, data, acl, CreateMode.CONTAINER);
+ }
+
+ /**
* Create a persistent, sequental node.
* @param path
* @param data
@@ -550,6 +766,26 @@
}
/**
+ * Create a persistent, sequential node.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @param ttl TTL of the node in milliseconds
+ * @return create node's path
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public String createPersistentSequentialWithTTL(String path, Object data, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ /**
* Create a persistent, sequential node and set its ACL.
* @param path
* @param acl
@@ -570,6 +806,27 @@
}
/**
+ * Create a persistent, sequential node and set its ACL.
+ * @param path the path where you want the node to be created
+ * @param acl list of ACL for the node
+ * @param data data of the node
+ * @param ttl TTL of the node in milliseconds
+ * @return create node's path
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl);
+ }
+
+ /**
* Create an ephemeral node.
* @param path
* @throws ZkInterruptedException
@@ -647,7 +904,7 @@
*/
public void createEphemeral(final String path, final List<ACL> acl, final String sessionId)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
- create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
+ create(path, null, acl, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId);
}
/**
@@ -671,6 +928,28 @@
}
/**
+ * Create a node.
+ * @param path the path where you want the node to be created
+ * @param data data of the node
+ * @param mode {@link CreateMode} of the node
+ * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL}
+ * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}
+ * @return create node's path
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public String create(final String path, Object data, final CreateMode mode, long ttl)
+ throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, ttl);
+ }
+
+ /**
* Create a node with ACL.
* @param path
* @param datat
@@ -688,7 +967,30 @@
*/
public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode)
throws IllegalArgumentException, ZkException {
- return create(path, datat, acl, mode, null);
+ return create(path, datat, acl, mode, TTL_NOT_SET, null);
+ }
+
+ /**
+ * Create a node with ACL.
+ * @param path the path where you want the node to be created
+ * @param datat data of the node
+ * @param acl list of ACL for the node
+ * @param mode {@link CreateMode} of the node
+ * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL}
+ * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}
+ * @return create node's path
+ * @throws ZkInterruptedException
+ * if operation was interrupted, or a required reconnection got interrupted
+ * @throws IllegalArgumentException
+ * if called from anything except the ZooKeeper event thread
+ * @throws ZkException
+ * if any ZooKeeper exception occurred
+ * @throws RuntimeException
+ * if any other exception occurs
+ */
+ public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode,
+ long ttl) throws IllegalArgumentException, ZkException {
+ return create(path, datat, acl, mode, ttl, null);
}
/**
@@ -704,6 +1006,8 @@
* @param dataObject data of the node
* @param acl list of ACL for the node
* @param mode {@link CreateMode} of the node
+ * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL}
+ * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}
* @param expectedSessionId the expected session ID of the ZK connection. It is not necessarily the
* session ID of current ZK Connection. If the expected session ID is NOT null,
* the node is guaranteed to be created in the expected session, or creation is
@@ -714,7 +1018,7 @@
* @throws ZkException if any zookeeper exception occurs
*/
private String create(final String path, final Object dataObject, final List<ACL> acl,
- final CreateMode mode, final String expectedSessionId)
+ final CreateMode mode, long ttl, final String expectedSessionId)
throws IllegalArgumentException, ZkException {
if (path == null) {
throw new NullPointerException("Path must not be null.");
@@ -727,8 +1031,14 @@
final byte[] dataBytes = dataObject == null ? null : serialize(dataObject, path);
checkDataSizeLimit(path, dataBytes);
- final String actualPath = retryUntilConnected(
- () -> getExpectedZookeeper(expectedSessionId).create(path, dataBytes, acl, mode));
+ final String actualPath;
+ if (mode.isTTL()) {
+ actualPath = retryUntilConnected(() -> getExpectedZookeeper(expectedSessionId)
+ .create(path, dataBytes, acl, mode, null, ttl));
+ } else {
+ actualPath = retryUntilConnected(() -> getExpectedZookeeper(expectedSessionId)
+ .create(path, dataBytes, acl, mode));
+ }
record(path, dataBytes, startT, ZkClientMonitor.AccessType.WRITE);
return actualPath;
@@ -785,7 +1095,7 @@
*/
public void createEphemeral(final String path, final Object data, final String sessionId)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
- create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId);
+ create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId);
}
/**
@@ -835,7 +1145,7 @@
public void createEphemeral(final String path, final Object data, final List<ACL> acl,
final String sessionId)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
- create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
+ create(path, data, acl, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId);
}
/**
@@ -881,7 +1191,7 @@
public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl,
final String sessionId)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
- return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
+ return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, TTL_NOT_SET, sessionId);
}
/**
@@ -913,7 +1223,7 @@
final String sessionId)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
- sessionId);
+ TTL_NOT_SET, sessionId);
}
/**
@@ -1937,10 +2247,10 @@
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat));
+ doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, parseExpectedSessionId(datat));
}
- private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
+ private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, long ttl,
final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) {
try {
retryUntilConnected(() -> {
@@ -1949,19 +2259,33 @@
GZipCompressionUtil.isCompressed(data)) {
@Override
protected void doRetry() {
- doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId);
+ doAsyncCreate(path, data, mode, ttl, System.currentTimeMillis(), cb, expectedSessionId);
}
- });
+ }, ttl);
return null;
});
} catch (RuntimeException e) {
// Process callback to release caller from waiting
cb.processResult(KeeperException.Code.APIERROR.intValue(), path,
- new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+ new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
throw e;
}
}
+ public void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl,
+ final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+ final long startT = System.currentTimeMillis();
+ final byte[] data;
+ try {
+ data = (datat == null ? null : serialize(datat, path));
+ } catch (ZkMarshallingError e) {
+ cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+ new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
+ return;
+ }
+ doAsyncCreate(path, data, mode, ttl, startT, cb, parseExpectedSessionId(datat));
+ }
+
// Async Data Accessors
public void asyncSetData(final String path, Object datat, final int version,
final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 08a2fb9..0193591 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -134,6 +134,12 @@
}
@Override
+ public String create(String path, byte[] data, List<ACL> acl, CreateMode mode, long ttl)
+ throws KeeperException, InterruptedException {
+ return _zk.create(path, data, acl, mode, null, ttl);
+ }
+
+ @Override
public void delete(String path) throws InterruptedException, KeeperException {
_zk.delete(path, -1);
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 506d234..72e2b95 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -23,6 +23,7 @@
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -111,13 +112,18 @@
}
}
- public static class CreateCallbackHandler extends DefaultCallback implements StringCallback {
+ public static class CreateCallbackHandler extends DefaultCallback implements StringCallback, Create2Callback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
callback(rc, path, ctx);
}
@Override
+ public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+ callback(rc, path, ctx);
+ }
+
+ @Override
public void handle() {
// TODO Auto-generated method stub
}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientFactoryTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientFactoryTestBase.java
index 6c3555f..cb27e16 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientFactoryTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientFactoryTestBase.java
@@ -114,14 +114,79 @@
}
/**
+ * Test creating a container node.
+ */
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreation")
+ public void testRealmAwareZkClientCreateContainer() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Test with createParents = true
+ _realmAwareZkClient.createContainer(TEST_VALID_PATH, true);
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH));
+
+ // Test writing and reading data
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createContainer(childPath, DUMMY_RECORD);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath);
+ Assert.assertEquals(DUMMY_RECORD.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ /**
+ * Test creating a sequential TTL node.
+ */
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateContainer")
+ public void testRealmAwareZkClientCreateSequentialWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ // Test writing and reading data
+ _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
+ long ttl = 1L;
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createPersistentSequentialWithTTL(childPath, DUMMY_RECORD, ttl);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath + "0000000000");
+ Assert.assertEquals(DUMMY_RECORD.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ /**
+ * Test creating a TTL node.
+ */
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateSequentialWithTTL")
+ public void testRealmAwareZkClientCreateWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ // Test with createParents = true
+ long ttl = 1L;
+ _realmAwareZkClient.createPersistentWithTTL(TEST_VALID_PATH, true, ttl);
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH));
+
+ // Test writing and reading data
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createPersistentWithTTL(childPath, DUMMY_RECORD, ttl);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath);
+ Assert.assertEquals(DUMMY_RECORD.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ /**
* Test the persistent create() call against a valid path and an invalid path.
* Valid path is one that belongs to the realm designated by the sharding key.
* Invalid path is one that does not belong to the realm designated by the sharding key.
*/
- @Test(dependsOnMethods = "testRealmAwareZkClientCreation")
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateWithTTL")
public void testRealmAwareZkClientCreatePersistent() {
- _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
-
// Test writing and reading against the validPath
_realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
_realmAwareZkClient.writeData(TEST_VALID_PATH, DUMMY_RECORD);
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index e201905..70581ca 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -167,15 +167,92 @@
}
}
+ /**
+ * Test creating a container node.
+ */
+ @Test(dependsOnMethods = "testUnsupportedOperations")
+ public void testRealmAwareZkClientCreateContainer() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Create a dummy ZNRecord
+ ZNRecord znRecord = new ZNRecord("DummyRecord");
+ znRecord.setSimpleField("Dummy", "Value");
+
+ // Test with createParents = true
+ _realmAwareZkClient.createContainer(TEST_VALID_PATH, true);
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH));
+
+ // Test writing and reading data
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createContainer(childPath, znRecord);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath);
+ Assert.assertEquals(znRecord.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ /**
+ * Test creating a sequential TTL node.
+ */
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateContainer")
+ public void testRealmAwareZkClientCreateSequentialWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ // Create a dummy ZNRecord
+ ZNRecord znRecord = new ZNRecord("DummyRecord");
+ znRecord.setSimpleField("Dummy", "Value");
+
+ // Test writing and reading data
+ _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
+ long ttl = 1L;
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createPersistentSequentialWithTTL(childPath, znRecord, ttl);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath + "0000000000");
+ Assert.assertEquals(znRecord.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ /**
+ * Test creating a TTL node.
+ */
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateSequentialWithTTL")
+ public void testRealmAwareZkClientCreateWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ // Create a dummy ZNRecord
+ ZNRecord znRecord = new ZNRecord("DummyRecord");
+ znRecord.setSimpleField("Dummy", "Value");
+
+ // Test with createParents = true
+ long ttl = 1L;
+ _realmAwareZkClient.createPersistentWithTTL(TEST_VALID_PATH, true, ttl);
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH));
+
+ // Test writing and reading data
+ String childPath = TEST_VALID_PATH + "/child";
+ _realmAwareZkClient.createPersistentWithTTL(childPath, znRecord, ttl);
+ ZNRecord retrievedRecord = _realmAwareZkClient.readData(childPath);
+ Assert.assertEquals(znRecord.getSimpleField("Dummy"),
+ retrievedRecord.getSimpleField("Dummy"));
+
+ // Clean up
+ _realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
/*
* Tests the persistent create() call against a valid path and an invalid path.
* Valid path is one that belongs to the realm designated by the sharding key.
* Invalid path is one that does not belong to the realm designated by the sharding key.
*/
- @Test(dependsOnMethods = "testUnsupportedOperations")
+ @Test(dependsOnMethods = "testRealmAwareZkClientCreateWithTTL")
public void testCreatePersistent() {
- _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
-
// Create a dummy ZNRecord
ZNRecord znRecord = new ZNRecord("DummyRecord");
znRecord.setSimpleField("Dummy", "Value");
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index 20d070b..9e8a75a 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -89,6 +89,135 @@
_zkClient.close();
}
+ @Test
+ void testUnimplementedTypes() {
+ // Make sure extended types are disabled
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+
+ // Make sure the test path is clear
+ String parentPath = "/tmp";
+ String path = "/tmp/unimplemented";
+ _zkClient.deleteRecursively(parentPath);
+
+ try {
+ long ttl = 1L;
+ _zkClient.createPersistentWithTTL(path, true, ttl);
+ } catch (ZkException e) {
+ AssertJUnit.assertTrue(e.getCause() instanceof KeeperException.UnimplementedException);
+ return;
+ }
+
+ // Clean up
+ _zkClient.deleteRecursively(parentPath);
+ AssertJUnit.fail();
+ }
+
+ @Test
+ void testCreatePersistentWithTTL() {
+ // Enable extended types and create a ZkClient
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Make sure the test path is clear
+ String parentPath = "/tmp";
+ String path = "/tmp/createTTL";
+ zkClient.deleteRecursively(parentPath);
+ AssertJUnit.assertFalse(zkClient.exists(parentPath));
+ AssertJUnit.assertFalse(zkClient.exists(path));
+
+ long ttl = 1L;
+ ZNRecord record = new ZNRecord("record");
+ String key = "key";
+ String value = "value";
+ record.setSimpleField(key, value);
+
+ // Create a ZNode with the above ZNRecord and read back its data
+ zkClient.createPersistentWithTTL(parentPath, record, ttl);
+ AssertJUnit.assertTrue(zkClient.exists(parentPath));
+ ZNRecord retrievedRecord = zkClient.readData(parentPath);
+ AssertJUnit.assertEquals(value, retrievedRecord.getSimpleField(key));
+
+ // Clear the path and test with createParents = true
+ AssertJUnit.assertTrue(zkClient.delete(parentPath));
+ zkClient.createPersistentWithTTL(path, true, ttl);
+ AssertJUnit.assertTrue(zkClient.exists(path));
+
+ // Clean up
+ zkClient.deleteRecursively(parentPath);
+ zkClient.close();
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ @Test
+ void testCreatePersistentSequentialWithTTL() {
+ // Enable extended types and create a ZkClient
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Make sure the test path is clear
+ String parentPath = "/tmp";
+ String path = "/tmp/createSequentialTTL";
+ zkClient.deleteRecursively(parentPath);
+ AssertJUnit.assertFalse(zkClient.exists(parentPath));
+ AssertJUnit.assertFalse(zkClient.exists(path + "0000000000"));
+
+ long ttl = 1L;
+ ZNRecord record = new ZNRecord("record");
+ String key = "key";
+ String value = "value";
+ record.setSimpleField(key, value);
+
+ // Create a ZNode with the above ZNRecord and read back its data
+ zkClient.createPersistent(parentPath);
+ zkClient.createPersistentSequentialWithTTL(path, record, ttl);
+ AssertJUnit.assertTrue(zkClient.exists(path + "0000000000"));
+ ZNRecord retrievedRecord = zkClient.readData(path + "0000000000");
+ AssertJUnit.assertEquals(value, retrievedRecord.getSimpleField(key));
+
+ // Clean up
+ zkClient.deleteRecursively(parentPath);
+ zkClient.close();
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
+ @Test
+ void testCreateContainer() {
+ // Enable extended types and create a ZkClient
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Make sure the test path is clear
+ String parentPath = "/tmp";
+ String path = "/tmp/createContainer";
+ zkClient.deleteRecursively(parentPath);
+ AssertJUnit.assertFalse(zkClient.exists(parentPath));
+ AssertJUnit.assertFalse(zkClient.exists(path));
+
+ ZNRecord record = new ZNRecord("record");
+ String key = "key";
+ String value = "value";
+ record.setSimpleField(key, value);
+
+ // Create a ZNode with the above ZNRecord and read back its data
+ zkClient.createContainer(parentPath, record);
+ AssertJUnit.assertTrue(zkClient.exists(parentPath));
+ ZNRecord retrievedRecord = zkClient.readData(parentPath);
+ AssertJUnit.assertEquals(value, retrievedRecord.getSimpleField(key));
+
+ // Clear the path and test with createParents = true
+ AssertJUnit.assertTrue(zkClient.delete(parentPath));
+ zkClient.createContainer(path, true);
+ AssertJUnit.assertTrue(zkClient.exists(path));
+
+ // Clean up
+ zkClient.deleteRecursively(parentPath);
+ zkClient.close();
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ }
+
@Test()
void testGetStat() {
String path = "/tmp/getStatTest";
diff --git a/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy b/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy
index 17cb116..065cdda 100644
--- a/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy
+++ b/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy
@@ -43,7 +43,7 @@
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
- <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+ <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>