blob: 993a4c94bb50dabde706b700b81032ea35887b63 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
@Test(groups = CuratorTestBase.zk36Group)
public class TestCuratorCache extends CuratorTestBase
{
@Test
public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
{
CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
{
client.start();
final CountDownLatch updatedLatch = new CountDownLatch(1);
final CountDownLatch addedLatch = new CountDownLatch(1);
client.create().creatingParentsIfNeeded().forPath("/test");
try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build())
{
cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build());
cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build());
cache.start();
client.create().forPath("/test/foo", "first".getBytes());
Assert.assertTrue(timing.awaitLatch(addedLatch));
client.setData().forPath("/test/foo", "something new".getBytes());
Assert.assertTrue(timing.awaitLatch(updatedLatch));
}
}
}
@Test
public void testAfterInitialized() throws Exception
{
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
{
client.start();
client.create().creatingParentsIfNeeded().forPath("/test");
client.create().creatingParentsIfNeeded().forPath("/test/one");
client.create().creatingParentsIfNeeded().forPath("/test/one/two");
client.create().creatingParentsIfNeeded().forPath("/test/one/two/three");
try (CuratorCache cache = CuratorCache.build(client, "/test"))
{
CountDownLatch initializedLatch = new CountDownLatch(1);
AtomicInteger eventCount = new AtomicInteger(0);
CuratorCacheListener listener = new CuratorCacheListener()
{
@Override
public void event(Type type, ChildData oldData, ChildData data)
{
eventCount.incrementAndGet();
}
@Override
public void initialized()
{
initializedLatch.countDown();
}
};
cache.listenable().addListener(builder().forAll(listener).afterInitialized().build());
cache.start();
Assert.assertTrue(timing.awaitLatch(initializedLatch));
Assert.assertEquals(initializedLatch.getCount(), 0);
Assert.assertEquals(cache.size(), 4);
Assert.assertTrue(cache.get("/test").isPresent());
Assert.assertTrue(cache.get("/test/one").isPresent());
Assert.assertTrue(cache.get("/test/one/two").isPresent());
Assert.assertTrue(cache.get("/test/one/two/three").isPresent());
}
}
}
@Test
public void testListenerBuilder() throws Exception
{
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
{
client.start();
try (CuratorCache cache = CuratorCache.build(client, "/test"))
{
Semaphore all = new Semaphore(0);
Semaphore deletes = new Semaphore(0);
Semaphore changes = new Semaphore(0);
Semaphore creates = new Semaphore(0);
Semaphore createsAndChanges = new Semaphore(0);
CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build();
cache.listenable().addListener(listener);
cache.start();
client.create().forPath("/test");
Assert.assertTrue(timing.acquireSemaphore(all, 1));
Assert.assertTrue(timing.acquireSemaphore(creates, 1));
Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
Assert.assertEquals(changes.availablePermits(), 0);
Assert.assertEquals(deletes.availablePermits(), 0);
client.setData().forPath("/test", "new".getBytes());
Assert.assertTrue(timing.acquireSemaphore(all, 1));
Assert.assertTrue(timing.acquireSemaphore(changes, 1));
Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
Assert.assertEquals(creates.availablePermits(), 0);
Assert.assertEquals(deletes.availablePermits(), 0);
client.delete().forPath("/test");
Assert.assertTrue(timing.acquireSemaphore(all, 1));
Assert.assertTrue(timing.acquireSemaphore(deletes, 1));
Assert.assertEquals(creates.availablePermits(), 0);
Assert.assertEquals(changes.availablePermits(), 0);
Assert.assertEquals(createsAndChanges.availablePermits(), 0);
}
}
}
@Test
public void testClearOnClose() throws Exception
{
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
{
CuratorCacheStorage storage;
client.start();
try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() )
{
cache.start();
storage = ((CuratorCacheImpl)cache).storage();
client.create().forPath("/test", "foo".getBytes());
client.create().forPath("/test/bar", "bar".getBytes());
timing.sleepABit();
}
Assert.assertEquals(storage.size(), 2);
try ( CuratorCache cache = CuratorCache.build(client, "/test") )
{
cache.start();
storage = ((CuratorCacheImpl)cache).storage();
timing.sleepABit();
}
Assert.assertEquals(storage.size(), 0);
}
}
}