blob: ed7718c3f021186365aee1d779e33fada08212bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.etcd.jetcd;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.etcd.ChildListener;
import org.apache.dubbo.remoting.etcd.StateListener;
import org.apache.dubbo.remoting.etcd.option.OptionUtil;
import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.KeyValue;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ClosedClientException;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.ConcurrentSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.stream.Collectors.toList;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY;
import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_THREADS;
import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_GRPC_QUEUES;
import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RETRY_PERIOD;
import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_SESSION_TIMEOUT;
import static org.apache.dubbo.remoting.etcd.Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS;
import static org.apache.dubbo.remoting.etcd.Constants.RETRY_PERIOD_KEY;
import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
/**
* etcd3 client.
*/
public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
private JEtcdClientWrapper clientWrapper;
private ScheduledExecutorService reconnectSchedule;
private ExecutorService notifyExecutor;
private int delayPeriod;
private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
public JEtcdClient(URL url) {
super(url);
try {
clientWrapper = new JEtcdClientWrapper(url);
clientWrapper.setConnectionStateListener((client, state) -> {
if (state == StateListener.CONNECTED) {
JEtcdClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == StateListener.DISCONNECTED) {
JEtcdClient.this.stateChanged(StateListener.DISCONNECTED);
}
});
delayPeriod = getUrl().getParameter(RETRY_PERIOD_KEY, DEFAULT_RETRY_PERIOD);
reconnectSchedule = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("etcd3-watch-auto-reconnect"));
notifyExecutor = new ThreadPoolExecutor(
1
, url.getParameter(ETCD3_NOTIFY_MAXTHREADS_KEYS, DEFAULT_ETCD3_NOTIFY_THREADS)
, DEFAULT_SESSION_TIMEOUT
, TimeUnit.MILLISECONDS
, new LinkedBlockingQueue<Runnable>(url.getParameter(DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, DEFAULT_GRPC_QUEUES * 3))
, new NamedThreadFactory("etcd3-notify", true));
clientWrapper.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void doCreatePersistent(String path) {
clientWrapper.createPersistent(path);
}
@Override
public long doCreateEphemeral(String path) {
return clientWrapper.createEphemeral(path);
}
@Override
public boolean checkExists(String path) {
return clientWrapper.checkExists(path);
}
@Override
public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
return new EtcdWatcher(listener);
}
@Override
public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
return etcdWatcher.forPath(path);
}
@Override
public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
etcdWatcher.unwatch();
}
@Override
public List<String> getChildren(String path) {
return clientWrapper.getChildren(path);
}
@Override
public boolean isConnected() {
return clientWrapper.isConnected();
}
@Override
public long createLease(long second) {
return clientWrapper.createLease(second);
}
@Override
public long createLease(long ttl, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return clientWrapper.createLease(ttl, timeout, unit);
}
@Override
public void delete(String path) {
clientWrapper.delete(path);
}
@Override
public void revokeLease(long lease) {
clientWrapper.revokeLease(lease);
}
@Override
public void doClose() {
try {
if (notifyExecutor != null) {
ExecutorUtil.shutdownNow(notifyExecutor, 100);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
try {
if (reconnectSchedule != null) {
ExecutorUtil.shutdownNow(reconnectSchedule, 100);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
clientWrapper.doClose();
}
}
@Override
public String getKVValue(String key) {
return clientWrapper.getKVValue(key);
}
@Override
public boolean put(String key, String value) {
return clientWrapper.put(key, value);
}
@Override
public boolean putEphemeral(String key, String value) {
return clientWrapper.putEphemeral(key, value);
}
public ManagedChannel getChannel() {
return clientWrapper.getChannel();
}
public class EtcdWatcher implements StreamObserver<WatchResponse> {
protected WatchGrpc.WatchStub watchStub;
protected StreamObserver<WatchRequest> watchRequest;
protected long watchId;
protected String path;
protected Throwable throwable;
protected volatile Set<String> urls = new ConcurrentSet<>();
private ChildListener listener;
protected ReentrantLock lock = new ReentrantLock(true);
public EtcdWatcher(ChildListener listener) {
this.listener = listener;
}
@Override
public void onNext(WatchResponse response) {
// prevents grpc on sending watchResponse to a closed watch client.
if (!isConnected()) {
return;
}
watchId = response.getWatchId();
if (listener != null) {
int modified = 0;
String service = null;
Iterator<Event> iterator = response.getEventsList().iterator();
while (iterator.hasNext()) {
Event event = iterator.next();
switch (event.getType()) {
case PUT: {
if (((service = find(event)) != null)
&& safeUpdate(service, true)) {
modified++;
}
break;
}
case DELETE: {
if (((service = find(event)) != null)
&& safeUpdate(service, false)) {
modified++;
}
break;
}
default:
break;
}
}
if (modified > 0) {
listener.childChanged(path, new ArrayList<>(urls));
}
}
}
@Override
public void onError(Throwable e) {
tryReconnect(e);
}
public void unwatch() {
// prevents grpc on sending watchResponse to a closed watch client.
if (!isConnected()) {
return;
}
try {
/**
* issue : https://github.com/apache/dubbo/issues/4115
*
* When the network is reconnected, the listener is empty
* and the data cannot be received.
*/
// this.listener = null;
if (watchRequest != null) {
WatchCancelRequest watchCancelRequest =
WatchCancelRequest.newBuilder().setWatchId(watchId).build();
WatchRequest cancelRequest = WatchRequest.newBuilder()
.setCancelRequest(watchCancelRequest).build();
this.watchRequest.onNext(cancelRequest);
}
} catch (Exception ignored) {
logger.warn("Failed to cancel watch for path '" + path + "'", ignored);
}
}
public List<String> forPath(String path) {
if (!isConnected()) {
throw new ClosedClientException("watch client has been closed, path '" + path + "'");
}
if (this.path != null) {
unwatch();
}
this.path = path;
lock.lock();
try {
this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
this.watchRequest = watchStub.watch(this);
this.watchRequest.onNext(nextRequest());
List<String> children = clientWrapper.getChildren(path);
/**
* caching the current service
*/
if (!children.isEmpty()) {
this.urls.addAll(filterChildren(children));
}
return new ArrayList<>(urls);
} finally {
lock.unlock();
}
}
private boolean safeUpdate(String service, boolean add) {
lock.lock();
try {
/**
* If the collection already contains the specified service, do nothing
*/
return add ? this.urls.add(service) : this.urls.remove(service);
} finally {
lock.unlock();
}
}
private String find(Event event) {
KeyValue keyValue = event.getKv();
String key = keyValue.getKey().toStringUtf8();
int len = path.length(), index = len, count = 0;
if (key.length() >= index) {
for (; (index = key.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
if (count++ > 1) {
break;
}
}
}
/**
* if children changed , we should refresh invokers
*/
if (count == 1) {
/**
* remove prefix
*/
return key.substring(len + 1);
}
return null;
}
private List<String> filterChildren(List<String> children) {
if (children == null) {
return Collections.emptyList();
}
if (children.size() <= 0) {
return children;
}
final int len = path.length();
return children.stream().parallel()
.filter(child -> {
int index = len, count = 0;
if (child.length() > len) {
for (; (index = child.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
if (count++ > 1) {
break;
}
}
}
return count == 1;
})
.map(child -> child.substring(len + 1))
.collect(toList());
}
/**
* create new watching request for current path.
*/
protected WatchRequest nextRequest() {
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
.setKey(ByteString.copyFromUtf8(path))
.setRangeEnd(ByteString.copyFrom(
OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes()))
.setProgressNotify(true);
return WatchRequest.newBuilder().setCreateRequest(builder).build();
}
public void tryReconnect(Throwable e) {
this.throwable = e;
logger.error("watcher client has error occurred, current path '" + path + "'", e);
// prevents grpc on sending error to a closed watch client.
if (!isConnected()) {
return;
}
Status status = Status.fromThrowable(e);
// system may be recover later, current connect won't be lost
if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
return;
}
// reconnect with a delay; avoiding immediate retry on a long connection downtime.
reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
}
protected synchronized void reconnect() {
this.closeWatchRequest();
this.recreateWatchRequest();
}
protected void recreateWatchRequest() {
if (watchRequest == null) {
this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
this.watchRequest = watchStub.watch(this);
}
this.watchRequest.onNext(nextRequest());
this.throwable = null;
logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected());
}
protected void closeWatchRequest() {
if (this.watchRequest == null) {
return;
}
try {
WatchCancelRequest watchCancelRequest =
WatchCancelRequest.newBuilder().setWatchId(watchId).build();
WatchRequest cancelRequest = WatchRequest.newBuilder()
.setCancelRequest(watchCancelRequest).build();
watchRequest.onNext(cancelRequest);
} finally {
this.watchRequest.onCompleted();
this.watchRequest = null;
}
}
@Override
public void onCompleted() {
// do not touch this method, if you want terminate this stream.
}
}
}