blob: 548305284718efec37488f4cf95a174c0a38a3d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.nacos;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.support.FailbackRegistry;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
/**
* Nacos {@link Registry}
*
* @see #SERVICE_NAME_SEPARATOR
* @see #PAGINATION_SIZE
* @see #LOOKUP_INTERVAL
* @since 2.6.5
*/
public class NacosRegistry extends FailbackRegistry {
/**
* All supported categories
*/
private static final String[] ALL_SUPPORTED_CATEGORIES = of(
PROVIDERS_CATEGORY,
CONSUMERS_CATEGORY,
ROUTERS_CATEGORY,
CONFIGURATORS_CATEGORY
);
private static final int CATEGORY_INDEX = 0;
private static final int SERVICE_INTERFACE_INDEX = 1;
private static final int SERVICE_VERSION_INDEX = 2;
private static final int SERVICE_GROUP_INDEX = 3;
private static final String WILDCARD = "*";
/**
* The separator for service name
* Change a constant to be configurable, it's designed for Windows file name that is compatible with old
* Nacos binary release(< 0.6.1)
*/
private static final String SERVICE_NAME_SEPARATOR = System.getProperty("nacos.service.name.separator", ":");
/**
* The pagination size of query for Nacos service names(only for Dubbo-OPS)
*/
private static final int PAGINATION_SIZE = Integer.getInteger("nacos.service.names.pagination.size", 100);
/**
* The interval in second of lookup Nacos service names(only for Dubbo-OPS)
*/
private static final long LOOKUP_INTERVAL = Long.getLong("nacos.service.names.lookup.interval", 30);
/**
* {@link ScheduledExecutorService} lookup Nacos service names(only for Dubbo-OPS)
*/
private volatile ScheduledExecutorService scheduledExecutorService;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final NamingService namingService;
private final ConcurrentMap<String, EventListener> nacosListeners;
public NacosRegistry(URL url, NamingService namingService) {
super(url);
this.namingService = namingService;
this.nacosListeners = new ConcurrentHashMap<>();
}
@Override
public boolean isAvailable() {
return "UP".equals(namingService.getServerStatus());
}
@Override
public List<URL> lookup(final URL url) {
final List<URL> urls = new LinkedList<>();
execute(namingService -> {
List<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
urls.addAll(buildURLs(url, instances));
}
});
return urls;
}
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}
@Override
public void doUnregister(final URL url) {
execute(namingService -> {
String serviceName = getServiceName(url);
Instance instance = createInstance(url);
namingService.deregisterInstance(serviceName, instance.getIp(), instance.getPort());
});
}
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
List<String> serviceNames = getServiceNames(url, listener);
doSubscribe(url, listener, serviceNames);
}
private void doSubscribe(final URL url, final NotifyListener listener, final List<String> serviceNames) {
execute(namingService -> {
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
});
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
}
}
private void shutdownServiceNamesLookup() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}
}
/**
* Get the service names from the specified {@link URL url}
*
* @param url {@link URL}
* @param listener {@link NotifyListener}
* @return non-null
*/
private List<String> getServiceNames(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
scheduleServiceNamesLookup(url, listener);
return getServiceNamesForOps(url);
} else {
return doGetServiceNames(url);
}
}
private boolean isAdminProtocol(URL url) {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}
private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
List<String> serviceNames = getAllServiceNames();
filterData(serviceNames, serviceName -> {
boolean accepted = false;
for (String category : ALL_SUPPORTED_CATEGORIES) {
String prefix = category + SERVICE_NAME_SEPARATOR;
if (StringUtils.startsWith(serviceName, prefix)) {
accepted = true;
break;
}
}
return accepted;
});
doSubscribe(url, listener, serviceNames);
}, LOOKUP_INTERVAL, LOOKUP_INTERVAL, TimeUnit.SECONDS);
}
}
/**
* Get the service names for Dubbo OPS
*
* @param url {@link URL}
* @return non-null
*/
private List<String> getServiceNamesForOps(URL url) {
List<String> serviceNames = getAllServiceNames();
filterServiceNames(serviceNames, url);
return serviceNames;
}
private List<String> getAllServiceNames() {
final List<String> serviceNames = new LinkedList<>();
execute(namingService -> {
int pageIndex = 1;
ListView<String> listView = namingService.getServicesOfServer(pageIndex, PAGINATION_SIZE);
// First page data
List<String> firstPageData = listView.getData();
// Append first page into list
serviceNames.addAll(firstPageData);
// the total count
int count = listView.getCount();
// the number of pages
int pageNumbers = count / PAGINATION_SIZE;
int remainder = count % PAGINATION_SIZE;
// remain
if (remainder > 0) {
pageNumbers += 1;
}
// If more than 1 page
while (pageIndex < pageNumbers) {
listView = namingService.getServicesOfServer(++pageIndex, PAGINATION_SIZE);
serviceNames.addAll(listView.getData());
}
});
return serviceNames;
}
private void filterServiceNames(List<String> serviceNames, URL url) {
final String[] categories = getCategories(url);
final String targetServiceInterface = url.getServiceInterface();
final String targetVersion = url.getParameter(VERSION_KEY,"");
final String targetGroup = url.getParameter(GROUP_KEY,"");
filterData(serviceNames, serviceName -> {
// split service name to segments
// (required) segments[0] = category
// (required) segments[1] = serviceInterface
// (optional) segments[2] = version
// (optional) segments[3] = group
String[] segments = StringUtils.split(serviceName, SERVICE_NAME_SEPARATOR);
int length = segments.length;
if (length != 4) { // must present 4 segments
return false;
}
String category = segments[CATEGORY_INDEX];
if (!ArrayUtils.contains(categories, category)) { // no match category
return false;
}
String serviceInterface = segments[SERVICE_INTERFACE_INDEX];
if (!WILDCARD.equals(targetServiceInterface) &&
!StringUtils.equals(targetServiceInterface, serviceInterface)) { // no match service interface
return false;
}
String version = segments[SERVICE_VERSION_INDEX];
if (!WILDCARD.equals(targetVersion) &&
!StringUtils.equals(targetVersion, version)) { // no match service version
return false;
}
String group = segments[SERVICE_GROUP_INDEX];
return group == null || WILDCARD.equals(targetGroup)
|| StringUtils.equals(targetGroup, group);
});
}
private <T> void filterData(Collection<T> collection, NacosDataFilter<T> filter) {
// remove if not accept
collection.removeIf(data -> !filter.accept(data));
}
private List<String> doGetServiceNames(URL url) {
String[] categories = getCategories(url);
List<String> serviceNames = new ArrayList<>(categories.length);
for (String category : categories) {
final String serviceName = getServiceName(url, category);
serviceNames.add(serviceName);
}
return serviceNames;
}
private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
if (instances.isEmpty()) {
return Collections.emptyList();
}
List<URL> urls = new LinkedList<>();
for (Instance instance : instances) {
URL url = buildURL(instance);
if (UrlUtils.isMatch(consumerURL, url)) {
urls.add(url);
}
}
return urls;
}
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
namingService.subscribe(serviceName, eventListener);
nacosListeners.put(serviceName, eventListener);
}
}
/**
* Notify the Healthy {@link Instance instances} to subscriber.
*
* @param url {@link URL}
* @param listener {@link NotifyListener}
* @param instances all {@link Instance instances}
*/
private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
List<Instance> healthyInstances = new LinkedList<>(instances);
// Healthy Instances
filterHealthyInstances(healthyInstances);
List<URL> urls = buildURLs(url, healthyInstances);
NacosRegistry.this.notify(url, listener, urls);
}
/**
* Get the categories from {@link URL}
*
* @param url {@link URL}
* @return non-null array
*/
private String[] getCategories(URL url) {
return ANY_VALUE.equals(url.getServiceInterface()) ?
ALL_SUPPORTED_CATEGORIES : url.getParameter(CATEGORY_KEY,new String[]{DEFAULT_CATEGORY});
}
private URL buildURL(Instance instance) {
Map<String, String> metadata = instance.getMetadata();
String protocol = metadata.get(PROTOCOL_KEY);
String path = metadata.get(PATH_KEY);
return new URL(protocol,
instance.getIp(),
instance.getPort(),
path,
instance.getMetadata());
}
private Instance createInstance(URL url) {
// Append default category if absent
String category = url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
URL newURL = url.addParameter(CATEGORY_KEY, category);
newURL = newURL.addParameter(PROTOCOL_KEY, url.getProtocol());
newURL = newURL.addParameter(PATH_KEY, url.getPath());
String ip = url.getHost();
int port = url.getPort();
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setMetadata(new HashMap<>(newURL.getParameters()));
return instance;
}
private String getServiceName(URL url) {
String category = url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
return getServiceName(url, category);
}
private String getServiceName(URL url, String category) {
StringBuilder serviceNameBuilder = new StringBuilder(category);
append(serviceNameBuilder, url, INTERFACE_KEY);
append(serviceNameBuilder, url, VERSION_KEY);
append(serviceNameBuilder, url, GROUP_KEY);
return serviceNameBuilder.toString();
}
private void append(StringBuilder target, URL url, String parameterName) {
target.append(SERVICE_NAME_SEPARATOR);
String parameterValue = url.getParameter(parameterName);
if (!StringUtils.isBlank(parameterValue)) {
target.append(parameterValue);
}
}
private void execute(NamingServiceCallback callback) {
try {
callback.callback(namingService);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getErrMsg(), e);
}
}
}
private void filterHealthyInstances(Collection<Instance> instances) {
filterData(instances, Instance::isEnabled);
}
@SafeVarargs
private static <T> T[] of(T... values) {
return values;
}
/**
* A filter for Nacos data
*
* @since 2.6.5
*/
private interface NacosDataFilter<T> {
/**
* Tests whether or not the specified data should be accepted.
*
* @param data The data to be tested
* @return <code>true</code> if and only if <code>data</code>
* should be accepted
*/
boolean accept(T data);
}
/**
* {@link NamingService} Callback
*
* @since 2.6.5
*/
interface NamingServiceCallback {
/**
* Callback
*
* @param namingService {@link NamingService}
* @throws NacosException
*/
void callback(NamingService namingService) throws NacosException;
}
}