| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.curator.framework.imps; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| import com.google.common.collect.Queues; |
| import org.apache.curator.RetryPolicy; |
| import org.apache.curator.RetrySleeper; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.framework.api.BackgroundCallback; |
| import org.apache.curator.framework.api.CreateBuilder; |
| import org.apache.curator.framework.api.CuratorEvent; |
| import org.apache.curator.framework.api.CuratorEventType; |
| import org.apache.curator.framework.api.CuratorListener; |
| import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; |
| import org.apache.curator.framework.state.ConnectionState; |
| import org.apache.curator.framework.state.ConnectionStateListener; |
| import org.apache.curator.retry.RetryForever; |
| import org.apache.curator.retry.RetryNTimes; |
| import org.apache.curator.retry.RetryOneTime; |
| import org.apache.curator.test.BaseClassForTests; |
| import org.apache.curator.test.InstanceSpec; |
| import org.apache.curator.test.TestingCluster; |
| import org.apache.curator.test.TestingServer; |
| import org.apache.curator.test.compatibility.CuratorTestBase; |
| import org.apache.curator.test.compatibility.Timing2; |
| import org.apache.curator.utils.CloseableUtils; |
| import org.apache.curator.utils.ZKPaths; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.data.Stat; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.DisplayName; |
| import org.junit.jupiter.api.Tag; |
| import org.junit.jupiter.api.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| @Tag(CuratorTestBase.zk35TestCompatibilityGroup) |
| public class TestFrameworkEdges extends BaseClassForTests |
| { |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| private final Timing2 timing = new Timing2(); |
| |
| @BeforeAll |
| public static void setUpClass() |
| { |
| System.setProperty("zookeeper.extendedTypesEnabled", "true"); |
| } |
| |
| @Test |
| @DisplayName("test case for CURATOR-525") |
| public void testValidateConnectionEventRaces() throws Exception |
| { |
| // test for CURATOR-525 - there is a race whereby Curator can go to LOST |
| // after the connection has been repaired. Prior to the fix, the Curator |
| // instance would become a zombie, never leaving the LOST state |
| try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new RetryOneTime(1))) |
| { |
| CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client; |
| |
| client.start(); |
| client.getChildren().forPath("/"); |
| client.create().forPath("/foo"); |
| |
| BlockingQueue<ConnectionState> stateQueue = new LinkedBlockingQueue<>(); |
| client.getConnectionStateListenable().addListener((__, newState) -> stateQueue.add(newState)); |
| |
| server.stop(); |
| assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.SUSPENDED); |
| assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST); |
| |
| clientImpl.debugCheckBackgroundRetryReadyLatch = new CountDownLatch(1); |
| clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1); |
| |
| client.delete().guaranteed().inBackground().forPath("/foo"); |
| timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch); |
| server.restart(); |
| assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED); |
| clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED; // simulate an expiration being handled after the connection is repaired |
| clientImpl.debugCheckBackgroundRetryLatch.countDown(); |
| assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST); |
| |
| assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED); |
| } |
| } |
| |
| @Test |
| public void testInjectSessionExpiration() throws Exception |
| { |
| try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) |
| { |
| client.start(); |
| |
| CountDownLatch expiredLatch = new CountDownLatch(1); |
| Watcher watcher = event -> { |
| if ( event.getState() == Watcher.Event.KeeperState.Expired ) |
| { |
| expiredLatch.countDown(); |
| } |
| }; |
| client.checkExists().usingWatcher(watcher).forPath("/foobar"); |
| client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); |
| assertTrue(timing.awaitLatch(expiredLatch)); |
| } |
| } |
| |
| @Test |
| public void testProtectionWithKilledSession() throws Exception |
| { |
| server.stop(); // not needed |
| |
| // see CURATOR-498 |
| // attempt to re-create the state described in the bug report: create a 3 Instance ensemble; |
| // have Curator connect to only 1 one of those instances; set failNextCreateForTesting to |
| // simulate protection mode searching; kill the connected server when this happens; |
| // wait for session timeout to elapse and then restart the instance. In most cases |
| // this will cause the scenario as Curator will send the session cancel and do protection mode |
| // search around the same time. The protection mode search should return first as it can be resolved |
| // by the Instance Curator is connected to but the session kill needs a quorum vote (it's a |
| // transaction) |
| |
| try (TestingCluster cluster = createAndStartCluster(3)) |
| { |
| InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec(); |
| |
| CountDownLatch serverStoppedLatch = new CountDownLatch(1); |
| RetryPolicy retryPolicy = new RetryForever(100) |
| { |
| @Override |
| public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) |
| { |
| if ( serverStoppedLatch.getCount() > 0 ) |
| { |
| try |
| { |
| cluster.killServer(instanceSpec0); |
| } |
| catch ( Exception e ) |
| { |
| // ignore |
| } |
| serverStoppedLatch.countDown(); |
| } |
| return super.allowRetry(retryCount, elapsedTimeMs, sleeper); |
| } |
| }; |
| |
| try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy)) |
| { |
| BlockingQueue<String> createdNode = new LinkedBlockingQueue<>(); |
| BackgroundCallback callback = (__, event) -> { |
| if ( event.getType() == CuratorEventType.CREATE ) |
| { |
| createdNode.offer(event.getPath()); |
| } |
| }; |
| |
| client.start(); |
| client.create().forPath("/test"); |
| |
| ErrorListenerPathAndBytesable<String> builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback); |
| ((CreateBuilderImpl)builder).failNextCreateForTesting = true; |
| |
| builder.forPath("/test/hey"); |
| |
| assertTrue(timing.awaitLatch(serverStoppedLatch)); |
| timing.forSessionSleep().sleep(); // wait for session to expire |
| cluster.restartServer(instanceSpec0); |
| |
| String path = timing.takeFromQueue(createdNode); |
| List<String> children = client.getChildren().forPath("/test"); |
| assertEquals(Collections.singletonList(ZKPaths.getNodeFromPath(path)), children); |
| } |
| } |
| } |
| |
| @Test |
| public void testBackgroundLatencyUnSleep() throws Exception |
| { |
| server.stop(); |
| |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE; |
| |
| final CountDownLatch latch = new CountDownLatch(3); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( (event.getType() == CuratorEventType.CREATE) && (event.getResultCode() == KeeperException.Code.OK.intValue()) ) |
| { |
| latch.countDown(); |
| } |
| } |
| }; |
| // queue multiple operations for a more complete test |
| client.create().inBackground(callback).forPath("/test"); |
| client.create().inBackground(callback).forPath("/test/one"); |
| client.create().inBackground(callback).forPath("/test/two"); |
| server.restart(); |
| |
| assertTrue(timing.awaitLatch(latch)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testCreateContainersForBadConnect() throws Exception |
| { |
| final int serverPort = server.getPort(); |
| server.close(); |
| |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryNTimes(10, timing.forSleepingABit().milliseconds())); |
| try |
| { |
| new Thread() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| Thread.sleep(3000); |
| server = new TestingServer(serverPort, true); |
| } |
| catch ( Exception e ) |
| { |
| e.printStackTrace(); |
| } |
| } |
| }.start(); |
| |
| client.start(); |
| client.createContainers("/this/does/not/exist"); |
| assertNotNull(client.checkExists().forPath("/this/does/not/exist")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testQuickClose() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0)); |
| try |
| { |
| client.start(); |
| client.close(); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testProtectedCreateNodeDeletion() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0)); |
| try |
| { |
| client.start(); |
| |
| for ( int i = 0; i < 2; ++i ) |
| { |
| CuratorFramework localClient = (i == 0) ? client : client.usingNamespace("nm"); |
| localClient.create().forPath("/parent"); |
| assertEquals(localClient.getChildren().forPath("/parent").size(), 0); |
| |
| CreateBuilderImpl createBuilder = (CreateBuilderImpl)localClient.create(); |
| createBuilder.failNextCreateForTesting = true; |
| FindAndDeleteProtectedNodeInBackground.debugInsertError.set(true); |
| try |
| { |
| createBuilder.withProtection().forPath("/parent/test"); |
| fail("failNextCreateForTesting should have caused a ConnectionLossException"); |
| } |
| catch ( KeeperException.ConnectionLossException e ) |
| { |
| // ignore, correct |
| } |
| |
| timing.sleepABit(); |
| List<String> children = localClient.getChildren().forPath("/parent"); |
| assertEquals(children.size(), 0, children.toString()); // protected mode should have deleted the node |
| |
| localClient.delete().forPath("/parent"); |
| } |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testPathsFromProtectingInBackground() throws Exception |
| { |
| for ( CreateMode mode : CreateMode.values() ) |
| { |
| internalTestPathsFromProtectingInBackground(mode); |
| } |
| } |
| |
| private void internalTestPathsFromProtectingInBackground(CreateMode mode) throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| client.create().creatingParentsIfNeeded().forPath("/a/b/c"); |
| |
| final BlockingQueue<String> paths = new ArrayBlockingQueue<String>(2); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| paths.put(event.getName()); |
| paths.put(event.getPath()); |
| } |
| }; |
| final String TEST_PATH = "/a/b/c/test-"; |
| long ttl = timing.forWaiting().milliseconds() * 1000; |
| CreateBuilder firstCreateBuilder = client.create(); |
| if ( mode.isTTL() ) |
| { |
| firstCreateBuilder.withTtl(ttl); |
| } |
| firstCreateBuilder.withMode(mode).inBackground(callback).forPath(TEST_PATH); |
| |
| String name1 = timing.takeFromQueue(paths); |
| String path1 = timing.takeFromQueue(paths); |
| |
| client.close(); |
| |
| client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryOneTime(1)); |
| client.start(); |
| |
| CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create(); |
| createBuilder.withProtection(); |
| if ( mode.isTTL() ) |
| { |
| createBuilder.withTtl(ttl); |
| } |
| |
| client.create().forPath(createBuilder.adjustPath(TEST_PATH)); |
| |
| createBuilder.debugForceFindProtectedNode = true; |
| createBuilder.withMode(mode).inBackground(callback).forPath(TEST_PATH); |
| |
| String name2 = timing.takeFromQueue(paths); |
| String path2 = timing.takeFromQueue(paths); |
| |
| assertEquals(ZKPaths.getPathAndNode(name1).getPath(), ZKPaths.getPathAndNode(TEST_PATH).getPath()); |
| assertEquals(ZKPaths.getPathAndNode(name2).getPath(), ZKPaths.getPathAndNode(TEST_PATH).getPath()); |
| assertEquals(ZKPaths.getPathAndNode(path1).getPath(), ZKPaths.getPathAndNode(TEST_PATH).getPath()); |
| assertEquals(ZKPaths.getPathAndNode(path2).getPath(), ZKPaths.getPathAndNode(TEST_PATH).getPath()); |
| |
| client.delete().deletingChildrenIfNeeded().forPath("/a/b/c"); |
| client.delete().forPath("/a/b"); |
| client.delete().forPath("/a"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void connectionLossWithBackgroundTest() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryOneTime(1)); |
| try |
| { |
| final CountDownLatch latch = new CountDownLatch(1); |
| client.start(); |
| client.getZookeeperClient().blockUntilConnectedOrTimedOut(); |
| server.close(); |
| client.getChildren().inBackground(new BackgroundCallback() |
| { |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| latch.countDown(); |
| } |
| }).forPath("/"); |
| assertTrue(timing.awaitLatch(latch)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testReconnectAfterLoss() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| |
| final CountDownLatch lostLatch = new CountDownLatch(1); |
| ConnectionStateListener listener = new ConnectionStateListener() |
| { |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState newState) |
| { |
| if ( newState == ConnectionState.LOST ) |
| { |
| lostLatch.countDown(); |
| } |
| } |
| }; |
| client.getConnectionStateListenable().addListener(listener); |
| |
| client.checkExists().forPath("/"); |
| |
| server.stop(); |
| |
| assertTrue(timing.awaitLatch(lostLatch)); |
| |
| try |
| { |
| client.checkExists().forPath("/"); |
| fail(); |
| } |
| catch ( KeeperException.ConnectionLossException e ) |
| { |
| // correct |
| } |
| |
| server.restart(); |
| client.checkExists().forPath("/"); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testGetAclNoStat() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| try |
| { |
| client.getACL().forPath("/"); |
| } |
| catch ( NullPointerException e ) |
| { |
| fail(); |
| } |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testMissedResponseOnBackgroundESCreate() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create(); |
| createBuilder.failNextCreateForTesting = true; |
| |
| final BlockingQueue<String> queue = Queues.newArrayBlockingQueue(1); |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| queue.put(event.getPath()); |
| } |
| }; |
| createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath("/"); |
| String ourPath = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); |
| assertTrue(ourPath.startsWith(ZKPaths.makePath("/", ProtectedUtils.PROTECTED_PREFIX))); |
| assertFalse(createBuilder.failNextCreateForTesting); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testMissedResponseOnESCreate() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create(); |
| createBuilder.failNextCreateForTesting = true; |
| String ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/"); |
| assertTrue(ourPath.startsWith(ZKPaths.makePath("/", ProtectedUtils.PROTECTED_PREFIX))); |
| assertFalse(createBuilder.failNextCreateForTesting); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testSessionKilled() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.create().forPath("/sessionTest"); |
| |
| CountDownLatch sessionDiedLatch = new CountDownLatch(1); |
| Watcher watcher = event -> { |
| if ( event.getState() == Watcher.Event.KeeperState.Expired ) |
| { |
| sessionDiedLatch.countDown(); |
| } |
| }; |
| |
| client.checkExists().usingWatcher(watcher).forPath("/sessionTest"); |
| client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); |
| assertTrue(timing.awaitLatch(sessionDiedLatch)); |
| assertNotNull(client.checkExists().forPath("/sessionTest")); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testNestedCalls() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.getCuratorListenable().addListener(new CuratorListener() |
| { |
| @Override |
| public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getType() == CuratorEventType.EXISTS ) |
| { |
| Stat stat = client.checkExists().forPath("/yo/yo/yo"); |
| assertNull(stat); |
| |
| client.create().inBackground(event.getContext()).forPath("/what"); |
| } |
| else if ( event.getType() == CuratorEventType.CREATE ) |
| { |
| ((CountDownLatch)event.getContext()).countDown(); |
| } |
| } |
| }); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| client.checkExists().inBackground(latch).forPath("/hey"); |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testBackgroundFailure() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| final CountDownLatch latch = new CountDownLatch(1); |
| client.getConnectionStateListenable().addListener(new ConnectionStateListener() |
| { |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState newState) |
| { |
| if ( newState == ConnectionState.LOST ) |
| { |
| latch.countDown(); |
| } |
| } |
| }); |
| |
| client.checkExists().forPath("/hey"); |
| client.checkExists().inBackground().forPath("/hey"); |
| |
| server.stop(); |
| |
| client.checkExists().inBackground().forPath("/hey"); |
| assertTrue(timing.awaitLatch(latch)); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testFailure() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1)); |
| client.start(); |
| try |
| { |
| client.checkExists().forPath("/hey"); |
| client.checkExists().inBackground().forPath("/hey"); |
| |
| server.stop(); |
| |
| client.checkExists().forPath("/hey"); |
| fail(); |
| } |
| catch ( KeeperException.ConnectionLossException e ) |
| { |
| // correct |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testRetry() throws Exception |
| { |
| final int MAX_RETRIES = 3; |
| |
| final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(10)); |
| client.start(); |
| try |
| { |
| final AtomicInteger retries = new AtomicInteger(0); |
| final Semaphore semaphore = new Semaphore(0); |
| RetryPolicy policy = new RetryPolicy() |
| { |
| @Override |
| public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) |
| { |
| semaphore.release(); |
| if ( retries.incrementAndGet() == MAX_RETRIES ) |
| { |
| try |
| { |
| server.restart(); |
| } |
| catch ( Exception e ) |
| { |
| throw new Error(e); |
| } |
| } |
| try |
| { |
| sleeper.sleepFor(100, TimeUnit.MILLISECONDS); |
| } |
| catch ( InterruptedException e ) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| return true; |
| } |
| }; |
| client.getZookeeperClient().setRetryPolicy(policy); |
| |
| server.stop(); |
| |
| // test foreground retry |
| client.checkExists().forPath("/hey"); |
| assertTrue(semaphore.tryAcquire(MAX_RETRIES, timing.forWaiting().seconds(), TimeUnit.SECONDS), "Remaining leases: " + semaphore.availablePermits()); |
| |
| // make sure we're reconnected |
| client.getZookeeperClient().setRetryPolicy(new RetryOneTime(100)); |
| client.checkExists().forPath("/hey"); |
| |
| client.getZookeeperClient().setRetryPolicy(policy); |
| semaphore.drainPermits(); |
| retries.set(0); |
| |
| server.stop(); |
| |
| // test background retry |
| client.checkExists().inBackground().forPath("/hey"); |
| assertTrue(semaphore.tryAcquire(MAX_RETRIES, timing.forWaiting().seconds(), TimeUnit.SECONDS), "Remaining leases: " + semaphore.availablePermits()); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| } |
| |
| @Test |
| public void testNotStarted() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.getData(); |
| fail(); |
| } |
| catch ( Exception e ) |
| { |
| // correct |
| } |
| catch ( Throwable e ) |
| { |
| fail("", e); |
| } |
| } |
| |
| @Test |
| public void testStopped() throws Exception |
| { |
| CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| try |
| { |
| client.start(); |
| client.getData(); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| } |
| |
| try |
| { |
| client.getData(); |
| fail(); |
| } |
| catch ( Exception e ) |
| { |
| // correct |
| } |
| } |
| |
| @Test |
| public void testDeleteChildrenConcurrently() throws Exception |
| { |
| final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); |
| ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| try |
| { |
| client.start(); |
| client2.start(); |
| |
| int childCount = 500; |
| for ( int i = 0; i < childCount; i++ ) |
| { |
| client.create().creatingParentsIfNeeded().forPath("/parent/child" + i); |
| } |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| executorService.submit(() -> { |
| try |
| { |
| client.delete().deletingChildrenIfNeeded().forPath("/parent"); |
| } |
| catch ( InterruptedException e ) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| catch ( Exception e ) |
| { |
| if ( e instanceof KeeperException.NoNodeException ) |
| { |
| fail("client delete failed, shouldn't throw NoNodeException", e); |
| } |
| else |
| { |
| fail("unexpected exception", e); |
| } |
| } |
| finally |
| { |
| latch.countDown(); |
| } |
| }); |
| |
| boolean threadDeleted = false; |
| Random random = new Random(); |
| for ( int i = 0; i < childCount; i++ ) |
| { |
| String child = "/parent/child" + random.nextInt(childCount); |
| try |
| { |
| if ( !threadDeleted ) |
| { |
| Stat stat = client2.checkExists().forPath(child); |
| if ( stat == null ) |
| { |
| // the thread client has begin deleted the children |
| threadDeleted = true; |
| log.info("client has deleted the child {}", child); |
| } |
| } |
| else |
| { |
| try |
| { |
| client2.delete().forPath(child); |
| log.info("client2 deleted the child {} successfully", child); |
| break; |
| } |
| catch ( KeeperException.NoNodeException ignore ) |
| { |
| // ignore, because it's deleted by the thread client |
| } |
| catch ( Exception e ) |
| { |
| fail("unexpected exception", e); |
| } |
| } |
| } |
| catch ( Exception e ) |
| { |
| fail("unexpected exception", e); |
| } |
| } |
| |
| assertTrue(timing.awaitLatch(latch)); |
| assertNull(client2.checkExists().forPath("/parent")); |
| } |
| finally |
| { |
| try |
| { |
| executorService.shutdownNow(); |
| executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS); |
| } |
| finally |
| { |
| CloseableUtils.closeQuietly(client); |
| CloseableUtils.closeQuietly(client2); |
| } |
| } |
| } |
| } |