blob: 88b99db6a9dfbb80c676faa3f13dddeaf78a2308 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.curator5;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
public class Curator5ZookeeperClientTest {
private static Curator5ZookeeperClient curatorClient;
private static CuratorFramework client = null;
private static int zookeeperServerPort1;
private static String zookeeperConnectionAddress1;
@BeforeAll
public static void setUp() throws Exception {
zookeeperConnectionAddress1 = System.getProperty("zookeeper.connection.address.1");
zookeeperServerPort1 = Integer.parseInt(zookeeperConnectionAddress1.substring(zookeeperConnectionAddress1.lastIndexOf(":") + 1));
curatorClient = new Curator5ZookeeperClient(URL.valueOf(zookeeperConnectionAddress1 + "/org.apache.dubbo.registry.RegistryService"));
client = CuratorFrameworkFactory.newClient("127.0.0.1:"+zookeeperServerPort1, new ExponentialBackoffRetry(1000, 3));
client.start();
}
@Test
public void testCheckExists() {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
curatorClient.create(path, false);
assertThat(curatorClient.checkExists(path), is(true));
assertThat(curatorClient.checkExists(path + "/noneexits"), is(false));
}
@Test
public void testChildrenPath() {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
curatorClient.create(path, false);
curatorClient.create(path + "/provider1", false);
curatorClient.create(path + "/provider2", false);
List<String> children = curatorClient.getChildren(path);
assertThat(children.size(), is(2));
}
@Test
@Timeout(value = 2)
public void testChildrenListener() throws InterruptedException {
String path = "/dubbo/org.apache.dubbo.demo.DemoListenerService/providers";
curatorClient.create(path, false);
final CountDownLatch countDownLatch = new CountDownLatch(1);
curatorClient.addTargetChildListener(path, new Curator5ZookeeperClient.CuratorWatcherImpl() {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
countDownLatch.countDown();
}
});
curatorClient.createPersistent(path + "/provider1");
countDownLatch.await();
}
@Test
public void testWithInvalidServer() {
Assertions.assertThrows(IllegalStateException.class, () -> {
curatorClient = new Curator5ZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:1/service?timeout=1000"));
curatorClient.create("/testPath", true);
});
}
@Test
public void testRemoveChildrenListener() {
ChildListener childListener = mock(ChildListener.class);
curatorClient.addChildListener("/children", childListener);
curatorClient.removeChildListener("/children", childListener);
}
@Test
public void testCreateExistingPath() {
curatorClient.create("/pathOne", false);
curatorClient.create("/pathOne", false);
}
@Test
public void testConnectedStatus() {
curatorClient.createEphemeral("/testPath");
boolean connected = curatorClient.isConnected();
assertThat(connected, is(true));
}
@Test
public void testCreateContent4Persistent() {
String path = "/curatorTest4CrContent/content.data";
String content = "createContentTest";
curatorClient.delete(path);
assertThat(curatorClient.checkExists(path), is(false));
assertNull(curatorClient.getContent(path));
curatorClient.create(path, content, false);
assertThat(curatorClient.checkExists(path), is(true));
assertEquals(curatorClient.getContent(path), content);
}
@Test
public void testCreateContent4Temp() {
String path = "/curatorTest4CrContent/content.data";
String content = "createContentTest";
curatorClient.delete(path);
assertThat(curatorClient.checkExists(path), is(false));
assertNull(curatorClient.getContent(path));
curatorClient.create(path, content, true);
assertThat(curatorClient.checkExists(path), is(true));
assertEquals(curatorClient.getContent(path), content);
}
@Test
public void testAddTargetDataListener() throws Exception {
String listenerPath = "/dubbo/service.name/configuration";
String path = listenerPath + "/dat/data";
String value = "vav";
curatorClient.create(path + "/d.json", value, true);
String valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertEquals(value, valueFromCache);
final AtomicInteger atomicInteger = new AtomicInteger(0);
curatorClient.addTargetDataListener(path + "/d.json", new Curator5ZookeeperClient.NodeCacheListenerImpl() {
@Override
public void nodeChanged() throws Exception {
atomicInteger.incrementAndGet();
}
});
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNotNull(valueFromCache);
Thread.sleep(100);
curatorClient.getClient().setData().forPath(path + "/d.json", "foo".getBytes());
Thread.sleep(100);
curatorClient.getClient().setData().forPath(path + "/d.json", "bar".getBytes());
curatorClient.delete(path + "/d.json");
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNull(valueFromCache);
Thread.sleep(200);
Assertions.assertTrue(3L <= atomicInteger.get());
}
@AfterAll
public static void testWithStoppedServer() {
curatorClient.close();
}
}