blob: 0344f9003522c9f58047ef5605304c358b5199be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.etcd.jetcd;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.remoting.etcd.ChildListener;
import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ClosedClientException;
import io.etcd.jetcd.watch.WatchEvent;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
@Disabled
public class JEtcdClientTest {
JEtcdClient client;
@Test
public void test_watch_when_create_path() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
final CountDownLatch notNotified = new CountDownLatch(1);
ChildListener childListener = (parent, children) -> {
Assertions.assertEquals(1, children.size());
Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
notNotified.countDown();
};
client.addChildListener(path, childListener);
client.createEphemeral(child);
Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
client.removeChildListener(path, childListener);
client.delete(child);
}
@Test
public void test_watch_when_modify() {
String path = "/dubbo/config/jetcd-client-unit-test/configurators";
String endpoint = "http://127.0.0.1:2379";
CountDownLatch latch = new CountDownLatch(1);
ByteSequence key = ByteSequence.from(path, UTF_8);
Watch.Listener listener = Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
Assertions.assertEquals("PUT", event.getEventType().toString());
Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
latch.countDown();
}
});
try (Client client = Client.builder().endpoints(endpoint).build();
Watch watch = client.getWatchClient();
Watch.Watcher watcher = watch.watch(key, listener)) {
// try to modify the key
client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
latch.await();
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
}
@Test
public void testWatchWithGrpc() {
String path = "/dubbo/config/test_watch_with_grpc/configurators";
String endpoint = "http://127.0.0.1:2379";
CountDownLatch latch = new CountDownLatch(1);
try (Client client = Client.builder().endpoints(endpoint).build()) {
ManagedChannel channel = getChannel(client);
StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
@Override
public void onNext(WatchResponse response) {
for (Event event : response.getEventsList()) {
Assertions.assertEquals("PUT", event.getType().toString());
Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
latch.countDown();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
});
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(ByteString.copyFrom(path, UTF_8));
observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
// try to modify the key
client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
latch.await(5, TimeUnit.SECONDS);
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
}
@Test
public void testCancelWatchWithGrpc() {
String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
String endpoint = "http://127.0.0.1:2379";
CountDownLatch updateLatch = new CountDownLatch(1);
CountDownLatch cancelLatch = new CountDownLatch(1);
final AtomicLong watchID = new AtomicLong(-1L);
try (Client client = Client.builder().endpoints(endpoint).build()) {
ManagedChannel channel = getChannel(client);
StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
@Override
public void onNext(WatchResponse response) {
watchID.set(response.getWatchId());
for (Event event : response.getEventsList()) {
Assertions.assertEquals("PUT", event.getType().toString());
Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
updateLatch.countDown();
}
if (response.getCanceled()) {
// received the cancel response
cancelLatch.countDown();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
});
// create
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
.setKey(ByteString.copyFrom(path, UTF_8));
// make the grpc call to watch the key
observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
// try to put the value
client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
// response received, latch counts down to zero
updateLatch.await();
WatchCancelRequest watchCancelRequest =
WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
WatchRequest cancelRequest = WatchRequest.newBuilder()
.setCancelRequest(watchCancelRequest).build();
observer.onNext(cancelRequest);
// try to put the value
client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
cancelLatch.await();
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
}
@Test
public void test_watch_when_create_wrong_path() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1";
final CountDownLatch notNotified = new CountDownLatch(1);
ChildListener childListener = (parent, children) -> {
Assertions.assertEquals(1, children.size());
Assertions.assertEquals(child, children.get(0));
notNotified.countDown();
};
client.addChildListener(path, childListener);
client.createEphemeral(child);
Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS));
client.removeChildListener(path, childListener);
client.delete(child);
}
@Test
public void test_watch_when_delete_path() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
final CountDownLatch notNotified = new CountDownLatch(1);
ChildListener childListener = (parent, children) -> {
Assertions.assertEquals(0, children.size());
notNotified.countDown();
};
client.createEphemeral(child);
client.addChildListener(path, childListener);
client.delete(child);
Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
client.removeChildListener(path, childListener);
}
@Test
public void test_watch_then_unwatch() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2";
final CountDownLatch notNotified = new CountDownLatch(1);
final CountDownLatch notTwiceNotified = new CountDownLatch(2);
final Holder notified = new Holder();
ChildListener childListener = (parent, children) -> {
Assertions.assertEquals(1, children.size());
Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
notNotified.countDown();
notTwiceNotified.countDown();
notified.getAndIncrease();
};
client.addChildListener(path, childListener);
client.createEphemeral(child);
Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
client.removeChildListener(path, childListener);
client.delete(child);
Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS));
Assertions.assertEquals(1, notified.value);
client.delete(child);
}
@Test
public void test_watch_on_unrecoverable_connection() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
JEtcdClient.EtcdWatcher watcher = null;
try {
ChildListener childListener = (parent, children) -> {
Assertions.assertEquals(path, parent);
};
client.addChildListener(path, childListener);
watcher = client.getChildListener(path, childListener);
watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException());
watcher.watchRequest.onNext(watcher.nextRequest());
} catch (Exception e) {
Assertions.assertTrue(e.getMessage().contains("calls are allowed"));
}
}
@Test
public void test_watch_on_recoverable_connection() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection";
String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1";
final CountDownLatch notNotified = new CountDownLatch(1);
final CountDownLatch notTwiceNotified = new CountDownLatch(2);
final Holder notified = new Holder();
ChildListener childListener = (parent, children) -> {
notTwiceNotified.countDown();
switch (notified.increaseAndGet()) {
case 1: {
notNotified.countDown();
Assertions.assertEquals(1, children.size());
Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
break;
}
case 2: {
Assertions.assertEquals(0, children.size());
Assertions.assertEquals(path, parent);
break;
}
default:
Assertions.fail("two many callback invoked.");
}
};
client.addChildListener(path, childListener);
client.createEphemeral(child);
// make sure first time callback successfully
Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
// connection error causes client to release all resources including current watcher
JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener);
watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException());
// trigger delete after unavailable
client.delete(child);
Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS));
client.removeChildListener(path, childListener);
}
@Test
public void test_watch_after_client_closed() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
client.close();
try {
client.addChildListener(path, (parent, children) -> {
Assertions.assertEquals(path, parent);
});
} catch (ClosedClientException e) {
Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage());
}
}
@BeforeEach
public void setUp() {
// timeout in 15 seconds.
URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService").addParameter(SESSION_TIMEOUT_KEY, 15000);
client = new JEtcdClient(url);
}
@AfterEach
public void tearDown() {
client.close();
}
static class Holder {
volatile int value;
synchronized int getAndIncrease() {
return value++;
}
synchronized int increaseAndGet() {
return ++value;
}
}
private ManagedChannel getChannel(Client client) {
try {
// hack, use reflection to get the shared channel.
Field connectionField = client.getClass().getDeclaredField("connectionManager");
connectionField.setAccessible(true);
Object connection = connectionField.get(client);
Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
ReflectUtils.makeAccessible(channelMethod);
ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
return channel;
} catch (Exception e) {
return null;
}
}
}