| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.curator.framework.imps; |
| |
| import static org.junit.jupiter.api.Assertions.assertArrayEquals; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNotSame; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.curator.framework.AuthInfo; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.framework.api.BackgroundCallback; |
| import org.apache.curator.framework.api.CuratorEvent; |
| import org.apache.curator.framework.api.CuratorEventType; |
| import org.apache.curator.framework.api.CuratorListener; |
| import org.apache.curator.framework.state.ConnectionState; |
| import org.apache.curator.framework.state.ConnectionStateListener; |
| import org.apache.curator.retry.RetryOneTime; |
| import org.apache.curator.test.BaseClassForTests; |
| import org.apache.curator.test.Timing; |
| import org.apache.curator.test.compatibility.CuratorTestBase; |
| import org.apache.curator.test.compatibility.Timing2; |
| import org.apache.curator.utils.CloseableUtils; |
| import org.apache.curator.utils.EnsurePath; |
| import org.apache.curator.utils.ZKPaths; |
| import org.apache.curator.utils.ZookeeperFactory; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.client.ZKClientConfig; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Tag; |
| import org.junit.jupiter.api.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Lists; |
| |
| @SuppressWarnings("deprecation") |
| @Tag(CuratorTestBase.zk35TestCompatibilityGroup) |
| public class TestFramework extends BaseClassForTests |
| { |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| |
| @BeforeEach |
| @Override |
| public void setup() throws Exception |
| { |
| System.setProperty("znode.container.checkIntervalMs", "1000"); |
| super.setup(); |
| } |
| |
| @AfterEach |
| @Override |
| public void teardown() throws Exception |
| { |
| System.clearProperty("znode.container.checkIntervalMs"); |
| super.teardown(); |
| } |
| |
| public void testWaitForShutdownTimeoutMs() throws Exception |
| { |
| final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1); |
| ZookeeperFactory zookeeperFactory = new ZookeeperFactory() |
| { |
| @Override |
| public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException |
| { |
| return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly) |
| { |
| @Override |
| public boolean close(int waitForShutdownTimeoutMs) throws InterruptedException |
| { |
| timeoutQueue.add(waitForShutdownTimeoutMs); |
| return super.close(waitForShutdownTimeoutMs); |
| } |
| }; |
| } |
| }; |
| |
| CuratorFramework client = CuratorFrameworkFactory.builder() |
| .connectString(server.getConnectString()) |
| .retryPolicy(new RetryOneTime(1)) |
| .zookeeperFactory(zookeeperFactory) |
| .waitForShutdownTimeoutMs(10064) |
| .build(); |
| try |
| { |
| client.start(); |
| client.checkExists().forPath("/foo"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| |
| Integer polledValue = timeoutQueue.poll(new Timing().milliseconds(), TimeUnit.MILLISECONDS); |
| assertNotNull(polledValue); |
| assertEquals(10064, polledValue.intValue()); |
| } |
| |
| @Test |
| public void testSessionLossWithLongTimeout() throws Exception |
| { |
| final Timing timing = new Timing(); |
| |
| try(final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(), |
| timing.connection(), new RetryOneTime(1))) |
| { |
| final CountDownLatch connectedLatch = new CountDownLatch(1); |
| final CountDownLatch lostLatch = new CountDownLatch(1); |
| final CountDownLatch restartedLatch = new CountDownLatch(1); |
| client.getConnectionStateListenable().addListener(new ConnectionStateListener() |
| { |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState newState) |
| { |
| if ( newState == ConnectionState.CONNECTED ) |
| { |
| connectedLatch.countDown(); |
| } |
| else if ( newState == ConnectionState.LOST ) |
| { |
| lostLatch.countDown(); |
| } |
| else if ( newState == ConnectionState.RECONNECTED ) |
| { |
| restartedLatch.countDown(); |
| } |
| } |
| }); |
| |
| client.start(); |
| |
| assertTrue(timing.awaitLatch(connectedLatch)); |
| |
| server.stop(); |
| |
| timing.sleepABit(); |
| assertTrue(timing.awaitLatch(lostLatch)); |
| |
| server.restart(); |
| assertTrue(timing.awaitLatch(restartedLatch)); |
| } |
| } |
| |
| @Test |
| public void testConnectionState() throws Exception |
| { |
| Timing timing = new Timing(); |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| try |
| { |
| final BlockingQueue<ConnectionState> queue = new LinkedBlockingQueue<ConnectionState>(); |
| ConnectionStateListener listener = new ConnectionStateListener() |
| { |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState newState) |
| { |
| queue.add(newState); |
| } |
| }; |
| client.getConnectionStateListenable().addListener(listener); |
| |
| client.start(); |
| assertEquals(queue.poll(timing.multiple(4).seconds(), TimeUnit.SECONDS), ConnectionState.CONNECTED); |
| server.stop(); |
| assertEquals(queue.poll(timing.multiple(4).seconds(), TimeUnit.SECONDS), ConnectionState.SUSPENDED); |
| assertEquals(queue.poll(timing.multiple(4).seconds(), TimeUnit.SECONDS), ConnectionState.LOST); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateOrSetData() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| String name = client.create().forPath("/hey", "there".getBytes()); |
| assertEquals(name, "/hey"); |
| name = client.create().orSetData().forPath("/hey", "other".getBytes()); |
| assertEquals(name, "/hey"); |
| assertArrayEquals(client.getData().forPath("/hey"), "other".getBytes()); |
| |
| name = client.create().orSetData().creatingParentsIfNeeded().forPath("/a/b/c", "there".getBytes()); |
| assertEquals(name, "/a/b/c"); |
| name = client.create().orSetData().creatingParentsIfNeeded().forPath("/a/b/c", "what".getBytes()); |
| assertEquals(name, "/a/b/c"); |
| assertArrayEquals(client.getData().forPath("/a/b/c"), "what".getBytes()); |
| |
| final BlockingQueue<CuratorEvent> queue = new LinkedBlockingQueue<>(); |
| BackgroundCallback backgroundCallback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| queue.add(event); |
| } |
| }; |
| client.create().orSetData().inBackground(backgroundCallback).forPath("/a/b/c", "another".getBytes()); |
| |
| CuratorEvent event = queue.poll(new Timing().milliseconds(), TimeUnit.MILLISECONDS); |
| assertNotNull(event); |
| assertEquals(event.getResultCode(), KeeperException.Code.OK.intValue()); |
| assertEquals(event.getType(), CuratorEventType.CREATE); |
| assertEquals(event.getPath(), "/a/b/c"); |
| assertEquals(event.getName(), "/a/b/c"); |
| |
| // callback should only be called once |
| CuratorEvent unexpectedEvent = queue.poll(new Timing().milliseconds(), TimeUnit.MILLISECONDS); |
| assertNull(unexpectedEvent); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testNamespaceWithWatcher() throws Exception |
| { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).namespace("aisa").retryPolicy(new RetryOneTime(1)).build(); |
| client.start(); |
| try |
| { |
| final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); |
| Watcher watcher = new Watcher() |
| { |
| @Override |
| public void process(WatchedEvent event) |
| { |
| try |
| { |
| queue.put(event.getPath()); |
| } |
| catch ( InterruptedException e ) |
| { |
| throw new Error(e); |
| } |
| } |
| }; |
| client.create().forPath("/base"); |
| client.getChildren().usingWatcher(watcher).forPath("/base"); |
| client.create().forPath("/base/child"); |
| |
| String path = new Timing2().takeFromQueue(queue); |
| assertEquals(path, "/base"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testNamespaceInBackground() throws Exception |
| { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).namespace("aisa").retryPolicy(new RetryOneTime(1)).build(); |
| client.start(); |
| try |
| { |
| final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); |
| CuratorListener listener = new CuratorListener() |
| { |
| @Override |
| public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.EXISTS ) |
| { |
| queue.put(event.getPath()); |
| } |
| } |
| }; |
| |
| client.getCuratorListenable().addListener(listener); |
| client.create().forPath("/base"); |
| client.checkExists().inBackground().forPath("/base"); |
| |
| String path = queue.poll(10, TimeUnit.SECONDS); |
| assertEquals(path, "/base"); |
| |
| client.getCuratorListenable().removeListener(listener); |
| |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| queue.put(event.getPath()); |
| } |
| }; |
| client.getChildren().inBackground(callback).forPath("/base"); |
| path = queue.poll(10, TimeUnit.SECONDS); |
| assertEquals(path, "/base"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateACLSingleAuth() throws Exception |
| { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "me1:pass1".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); |
| List<ACL> aclList = Lists.newArrayList(acl); |
| client.create().withACL(aclList).forPath("/test", "test".getBytes()); |
| client.close(); |
| |
| // Try setting data with me1:pass1 |
| client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "me1:pass1".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| fail("Auth failed"); |
| } |
| client.close(); |
| |
| // Try setting data with something:else |
| client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "something:else".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| fail("Should have failed with auth exception"); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| // expected |
| } |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testACLDeprecatedApis() throws Exception |
| { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() |
| .connectString(server.getConnectString()) |
| .retryPolicy(new RetryOneTime(1)); |
| assertNull(builder.getAuthScheme()); |
| assertNull(builder.getAuthValue()); |
| |
| builder = builder.authorization("digest", "me1:pass1".getBytes()); |
| assertEquals(builder.getAuthScheme(), "digest"); |
| assertArrayEquals(builder.getAuthValue(), "me1:pass1".getBytes()); |
| } |
| |
| @Test |
| public void testCreateACLMultipleAuths() throws Exception |
| { |
| // Add a few authInfos |
| List<AuthInfo> authInfos = new ArrayList<AuthInfo>(); |
| authInfos.add(new AuthInfo("digest", "me1:pass1".getBytes())); |
| authInfos.add(new AuthInfo("digest", "me2:pass2".getBytes())); |
| |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder |
| .connectString(server.getConnectString()) |
| .authorization(authInfos) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); |
| List<ACL> aclList = Lists.newArrayList(acl); |
| client.create().withACL(aclList).forPath("/test", "test".getBytes()); |
| client.close(); |
| |
| // Try setting data with me1:pass1 |
| client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "me1:pass1".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| fail("Auth failed"); |
| } |
| client.close(); |
| |
| // Try setting data with me1:pass1 |
| client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "me2:pass2".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| fail("Auth failed"); |
| } |
| client.close(); |
| |
| // Try setting data with something:else |
| client = builder |
| .connectString(server.getConnectString()) |
| .authorization("digest", "something:else".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| fail("Should have failed with auth exception"); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| // expected |
| } |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateACLWithReset() throws Exception |
| { |
| Timing timing = new Timing(); |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder |
| .connectString(server.getConnectString()) |
| .sessionTimeoutMs(timing.session()) |
| .connectionTimeoutMs(timing.connection()) |
| .authorization("digest", "me:pass".getBytes()) |
| .retryPolicy(new RetryOneTime(1)) |
| .build(); |
| client.start(); |
| try |
| { |
| final CountDownLatch lostLatch = new CountDownLatch(1); |
| ConnectionStateListener listener = new ConnectionStateListener() |
| { |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState newState) |
| { |
| if ( newState == ConnectionState.LOST ) |
| { |
| lostLatch.countDown(); |
| } |
| } |
| }; |
| client.getConnectionStateListenable().addListener(listener); |
| |
| ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); |
| List<ACL> aclList = Lists.newArrayList(acl); |
| client.create().withACL(aclList).forPath("/test", "test".getBytes()); |
| |
| server.stop(); |
| assertTrue(timing.awaitLatch(lostLatch)); |
| try |
| { |
| client.checkExists().forPath("/"); |
| fail("Connection should be down"); |
| } |
| catch ( KeeperException.ConnectionLossException e ) |
| { |
| // expected |
| } |
| |
| server.restart(); |
| try |
| { |
| client.setData().forPath("/test", "test".getBytes()); |
| } |
| catch ( KeeperException.NoAuthException e ) |
| { |
| fail("Auth failed"); |
| } |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateParents() throws Exception |
| { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); |
| client.start(); |
| try |
| { |
| client.create().creatingParentsIfNeeded().forPath("/one/two/three", "foo".getBytes()); |
| byte[] data = client.getData().forPath("/one/two/three"); |
| assertArrayEquals(data, "foo".getBytes()); |
| |
| client.create().creatingParentsIfNeeded().forPath("/one/two/another", "bar".getBytes()); |
| data = client.getData().forPath("/one/two/another"); |
| assertArrayEquals(data, "bar".getBytes()); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testOverrideCreateParentContainers() throws Exception |
| { |
| if ( !checkForContainers() ) |
| { |
| return; |
| } |
| |
| CuratorFramework client = CuratorFrameworkFactory.builder() |
| .connectString(server.getConnectString()) |
| .retryPolicy(new RetryOneTime(1)) |
| .dontUseContainerParents() |
| .build(); |
| try |
| { |
| client.start(); |
| client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes()); |
| byte[] data = client.getData().forPath("/one/two/three"); |
| assertArrayEquals(data, "foo".getBytes()); |
| |
| client.delete().forPath("/one/two/three"); |
| new Timing().sleepABit(); |
| |
| assertNotNull(client.checkExists().forPath("/one/two")); |
| new Timing().sleepABit(); |
| assertNotNull(client.checkExists().forPath("/one")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateParentContainers() throws Exception |
| { |
| if ( !checkForContainers() ) |
| { |
| return; |
| } |
| |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); |
| try |
| { |
| client.start(); |
| client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes()); |
| byte[] data = client.getData().forPath("/one/two/three"); |
| assertArrayEquals(data, "foo".getBytes()); |
| |
| client.delete().forPath("/one/two/three"); |
| new Timing().sleepABit(); |
| |
| assertNull(client.checkExists().forPath("/one/two")); |
| new Timing().sleepABit(); |
| assertNull(client.checkExists().forPath("/one")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| private boolean checkForContainers() |
| { |
| if ( ZKPaths.getContainerCreateMode() == CreateMode.PERSISTENT ) |
| { |
| System.out.println("Not using CreateMode.CONTAINER enabled version of ZooKeeper"); |
| return false; |
| } |
| return true; |
| } |
| |
| @Test |
| public void testCreatingParentsTheSame() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| assertNull(client.checkExists().forPath("/one/two")); |
| client.create().creatingParentContainersIfNeeded().forPath("/one/two/three"); |
| assertNotNull(client.checkExists().forPath("/one/two")); |
| |
| client.delete().deletingChildrenIfNeeded().forPath("/one"); |
| assertNull(client.checkExists().forPath("/one")); |
| |
| assertNull(client.checkExists().forPath("/one/two")); |
| client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three"); |
| assertNotNull(client.checkExists().forPath("/one/two")); |
| assertNull(client.checkExists().forPath("/one/two/three")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testExistsCreatingParents() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| assertNull(client.checkExists().forPath("/one/two")); |
| client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three"); |
| assertNotNull(client.checkExists().forPath("/one/two")); |
| assertNull(client.checkExists().forPath("/one/two/three")); |
| assertNull(client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testExistsCreatingParentsInBackground() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| assertNull(client.checkExists().forPath("/one/two")); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| latch.countDown(); |
| } |
| }; |
| client.checkExists().creatingParentContainersIfNeeded().inBackground(callback).forPath("/one/two/three"); |
| assertTrue(new Timing().awaitLatch(latch)); |
| assertNotNull(client.checkExists().forPath("/one/two")); |
| assertNull(client.checkExists().forPath("/one/two/three")); |
| assertNull(client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testEnsurePathWithNamespace() throws Exception |
| { |
| final String namespace = "jz"; |
| |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace(namespace).build(); |
| client.start(); |
| try |
| { |
| EnsurePath ensurePath = new EnsurePath("/pity/the/fool"); |
| ensurePath.ensure(client.getZookeeperClient()); |
| assertNull(client.getZookeeperClient().getZooKeeper().exists("/jz/pity/the/fool", false)); |
| |
| ensurePath = client.newNamespaceAwareEnsurePath("/pity/the/fool"); |
| ensurePath.ensure(client.getZookeeperClient()); |
| assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/jz/pity/the/fool", false)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateContainersWithNamespace() throws Exception |
| { |
| final String namespace = "container1"; |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace(namespace).build(); |
| try |
| { |
| client.start(); |
| String path = "/path1/path2"; |
| client.createContainers(path); |
| assertNotNull(client.checkExists().forPath(path)); |
| assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/" + namespace + path, false)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| |
| @Test |
| public void testCreateContainersUsingNamespace() throws Exception |
| { |
| final String namespace = "container2"; |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); |
| try |
| { |
| client.start(); |
| CuratorFramework nsClient = client.usingNamespace(namespace); |
| String path = "/path1/path2"; |
| nsClient.createContainers(path); |
| assertNotNull(nsClient.checkExists().forPath(path)); |
| assertNotNull(nsClient.getZookeeperClient().getZooKeeper().exists("/" + namespace + path, false)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testNamespace() throws Exception |
| { |
| final String namespace = "TestNamespace"; |
| |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace(namespace).build(); |
| client.start(); |
| try |
| { |
| String actualPath = client.create().forPath("/test"); |
| assertEquals(actualPath, "/test"); |
| assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/" + namespace + "/test", false)); |
| assertNull(client.getZookeeperClient().getZooKeeper().exists("/test", false)); |
| |
| actualPath = client.usingNamespace(null).create().forPath("/non"); |
| assertEquals(actualPath, "/non"); |
| assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/non", false)); |
| |
| client.create().forPath("/test/child", "hey".getBytes()); |
| byte[] bytes = client.getData().forPath("/test/child"); |
| assertArrayEquals(bytes, "hey".getBytes()); |
| |
| bytes = client.usingNamespace(null).getData().forPath("/" + namespace + "/test/child"); |
| assertArrayEquals(bytes, "hey".getBytes()); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCustomCallback() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| final CountDownLatch latch = new CountDownLatch(1); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.CREATE ) |
| { |
| if ( event.getPath().equals("/head") ) |
| { |
| latch.countDown(); |
| } |
| } |
| } |
| }; |
| client.create().inBackground(callback).forPath("/head"); |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testSync() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.getCuratorListenable().addListener |
| ( |
| new CuratorListener() |
| { |
| @Override |
| public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.SYNC ) |
| { |
| assertEquals(event.getPath(), "/head"); |
| ((CountDownLatch)event.getContext()).countDown(); |
| } |
| } |
| } |
| ); |
| |
| client.create().forPath("/head"); |
| assertNotNull(client.checkExists().forPath("/head")); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| client.sync("/head", latch); |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testSyncNew() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.create().forPath("/head"); |
| assertNotNull(client.checkExists().forPath("/head")); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.SYNC ) |
| { |
| latch.countDown(); |
| } |
| } |
| }; |
| client.sync().inBackground(callback).forPath("/head"); |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testGetSequentialChildren() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.create().forPath("/head"); |
| |
| for ( int i = 0; i < 10; ++i ) |
| { |
| client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child"); |
| } |
| |
| List<String> children = client.getChildren().forPath("/head"); |
| assertEquals(children.size(), 10); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testBackgroundGetDataWithWatch() throws Exception |
| { |
| final byte[] data1 = {1, 2, 3}; |
| final byte[] data2 = {4, 5, 6, 7}; |
| |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| final CountDownLatch watchedLatch = new CountDownLatch(1); |
| client.getCuratorListenable().addListener |
| ( |
| new CuratorListener() |
| { |
| @Override |
| public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.GET_DATA ) |
| { |
| assertEquals(event.getPath(), "/test"); |
| assertArrayEquals(event.getData(), data1); |
| ((CountDownLatch)event.getContext()).countDown(); |
| } |
| else if ( event.getType() == CuratorEventType.WATCHED ) |
| { |
| if ( event.getWatchedEvent().getType() == Watcher.Event.EventType.NodeDataChanged ) |
| { |
| assertEquals(event.getPath(), "/test"); |
| watchedLatch.countDown(); |
| } |
| } |
| } |
| } |
| ); |
| |
| client.create().forPath("/test", data1); |
| |
| CountDownLatch backgroundLatch = new CountDownLatch(1); |
| client.getData().watched().inBackground(backgroundLatch).forPath("/test"); |
| assertTrue(backgroundLatch.await(10, TimeUnit.SECONDS)); |
| |
| client.setData().forPath("/test", data2); |
| assertTrue(watchedLatch.await(10, TimeUnit.SECONDS)); |
| byte[] checkData = client.getData().forPath("/test"); |
| assertArrayEquals(checkData, data2); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testBackgroundCreate() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.getCuratorListenable().addListener |
| ( |
| new CuratorListener() |
| { |
| @Override |
| public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.CREATE ) |
| { |
| assertEquals(event.getPath(), "/test"); |
| ((CountDownLatch)event.getContext()).countDown(); |
| } |
| } |
| } |
| ); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| client.create().inBackground(latch).forPath("/test", new byte[]{1, 2, 3}); |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateModes() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| byte[] writtenBytes = {1, 2, 3}; |
| client.create().forPath("/test", writtenBytes); // should be persistent |
| |
| client.close(); |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| |
| byte[] readBytes = client.getData().forPath("/test"); |
| assertArrayEquals(writtenBytes, readBytes); |
| |
| client.create().withMode(CreateMode.EPHEMERAL).forPath("/ghost", writtenBytes); |
| |
| client.close(); |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| |
| readBytes = client.getData().forPath("/test"); |
| assertArrayEquals(writtenBytes, readBytes); |
| Stat stat = client.checkExists().forPath("/ghost"); |
| assertNull(stat); |
| |
| String realPath = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/pseq", writtenBytes); |
| assertNotSame(realPath, "/pseq"); |
| |
| client.close(); |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| |
| readBytes = client.getData().forPath(realPath); |
| assertArrayEquals(writtenBytes, readBytes); |
| |
| realPath = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/eseq", writtenBytes); |
| assertNotSame(realPath, "/eseq"); |
| |
| client.close(); |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| |
| stat = client.checkExists().forPath(realPath); |
| assertNull(stat); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testConfigurableZookeeper() throws Exception |
| { |
| CuratorFramework client = null; |
| try |
| { |
| ZKClientConfig zkClientConfig = new ZKClientConfig(); |
| String zookeeperRequestTimeout = "30000"; |
| zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, zookeeperRequestTimeout); |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1), zkClientConfig); |
| client.start(); |
| |
| byte[] writtenBytes = {1, 2, 3}; |
| client.create().forPath("/test", writtenBytes); |
| |
| byte[] readBytes = client.getData().forPath("/test"); |
| assertArrayEquals(writtenBytes, readBytes); |
| assertEquals(zookeeperRequestTimeout, client.getZookeeperClient().getZooKeeper().getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT)); |
| |
| } catch (NoSuchMethodError e) { |
| log.debug("NoSuchMethodError: ", e); |
| log.info("Got NoSuchMethodError, meaning probably this cannot be used with ZooKeeper version < 3.6.1"); |
| } |
| finally |
| { |
| try { |
| CloseableUtils.closeQuietly(client); |
| } catch (NoSuchMethodError e) { |
| log.debug("close: NoSuchMethodError: ", e); |
| log.info("close: Got NoSuchMethodError, meaning probably this cannot be used with ZooKeeper version < 3.6.1"); |
| } |
| } |
| } |
| |
| @Test |
| public void testSimple() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/test", new byte[]{1, 2, 3}); |
| assertEquals(path, "/test"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testSequentialWithTrailingSeparator() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.create().forPath("/test"); |
| //This should create a node in the form of "/test/00000001" |
| String path = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/"); |
| assertTrue(path.startsWith("/test/")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| } |