blob: a4cdd6c8a1641575a0076dae31efa7c2ff968f48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.zookeeper;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.function.ThrowableConsumer;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.DefaultPage;
import org.apache.dubbo.common.utils.Page;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.KeeperException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.function.ThrowableFunction.execute;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildServiceDiscovery;
/**
* Zookeeper {@link ServiceDiscovery} implementation based on
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Apache Curator X Discovery</a>
*/
public class ZookeeperServiceDiscovery implements ServiceDiscovery {
private final Logger logger = LoggerFactory.getLogger(getClass());
private URL registryURL;
private EventDispatcher dispatcher;
private CuratorFramework curatorFramework;
private String rootPath;
private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
private ServiceInstance serviceInstance;
/**
* The Key is watched Zookeeper path, the value is an instance of {@link CuratorWatcher}
*/
private final Map<String, CuratorWatcher> watcherCaches = new ConcurrentHashMap<>();
@Override
public void initialize(URL registryURL) throws Exception {
this.registryURL = registryURL;
this.curatorFramework = buildCuratorFramework(registryURL);
this.rootPath = ROOT_PATH.getParameterValue(registryURL);
this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
this.serviceDiscovery.start();
}
@Override
public URL getUrl() {
return registryURL;
}
public void destroy() throws Exception {
serviceDiscovery.close();
}
@Override
public ServiceInstance getLocalInstance() {
return serviceInstance;
}
public void register(ServiceInstance serviceInstance) throws RuntimeException {
this.serviceInstance = serviceInstance;
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.registerService(build(serviceInstance));
});
}
public void update(ServiceInstance serviceInstance) throws RuntimeException {
this.serviceInstance = serviceInstance;
if (isInstanceUpdated(serviceInstance)) {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.updateService(build(serviceInstance));
});
}
}
public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.unregisterService(build(serviceInstance));
});
}
@Override
public Set<String> getServices() {
return doInServiceDiscovery(s -> new LinkedHashSet<>(s.queryForNames()));
}
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
return doInServiceDiscovery(s -> build(s.queryForInstances(serviceName)));
}
@Override
public Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) {
String path = buildServicePath(serviceName);
return execute(path, p -> {
List<ServiceInstance> serviceInstances = new LinkedList<>();
List<String> serviceIds = new LinkedList<>(curatorFramework.getChildren().forPath(p));
int totalSize = serviceIds.size();
Iterator<String> iterator = serviceIds.iterator();
for (int i = 0; i < offset; i++) {
if (iterator.hasNext()) { // remove the elements from 0 to offset
iterator.next();
iterator.remove();
}
}
for (int i = 0; i < pageSize; i++) {
if (iterator.hasNext()) {
String serviceId = iterator.next();
ServiceInstance serviceInstance = build(serviceDiscovery.queryForInstance(serviceName, serviceId));
serviceInstances.add(serviceInstance);
}
}
return new DefaultPage<>(offset, pageSize, serviceInstances, totalSize);
});
}
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
}
private void doInServiceRegistry(ThrowableConsumer<org.apache.curator.x.discovery.ServiceDiscovery> consumer) {
ThrowableConsumer.execute(serviceDiscovery, s -> {
consumer.accept(s);
});
}
private <R> R doInServiceDiscovery(ThrowableFunction<org.apache.curator.x.discovery.ServiceDiscovery, R> function) {
return execute(serviceDiscovery, function);
}
protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
String path = buildServicePath(serviceName);
CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener));
try {
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
} catch (KeeperException.NoNodeException e) {
// ignored
if (logger.isErrorEnabled()) {
logger.error(e.getMessage());
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private String buildServicePath(String serviceName) {
return rootPath + "/" + serviceName;
}
}