blob: f747a3071f92f1e303b9f7cf038a1315b50cbf6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.URLStrParser;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.url.component.DubboServiceAddressURL;
import org.apache.dubbo.common.url.component.ServiceAddressURL;
import org.apache.dubbo.common.url.component.URLAddress;
import org.apache.dubbo.common.url.component.URLParam;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.ProviderFirstParams;
import org.apache.dubbo.rpc.model.ScopeModel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.URLStrParser.ENCODED_AND_MARK;
import static org.apache.dubbo.common.URLStrParser.ENCODED_PID_KEY;
import static org.apache.dubbo.common.URLStrParser.ENCODED_QUESTION_MARK;
import static org.apache.dubbo.common.URLStrParser.ENCODED_TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CACHE_CLEAR_TASK_INTERVAL;
import static org.apache.dubbo.common.constants.CommonConstants.CACHE_CLEAR_WAITING_THRESHOLD;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_SEPARATOR_ENCODED;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.ENABLE_EMPTY_PROTECTION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
/**
* <p>
* Based on FailbackRegistry, it adds a URLAddress and URLParam cache to save RAM space.
*
* <p>
* It's useful for registries whose sdk returns raw string as provider instance. For example, Zookeeper and etcd.
*
* @see org.apache.dubbo.registry.support.FailbackRegistry
* @see org.apache.dubbo.registry.support.AbstractRegistry
*/
public abstract class CacheableFailbackRegistry extends FailbackRegistry {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(CacheableFailbackRegistry.class);
private static String[] VARIABLE_KEYS = new String[]{ENCODED_TIMESTAMP_KEY, ENCODED_PID_KEY};
protected Map<String, URLAddress> stringAddress = new ConcurrentHashMap<>();
protected Map<String, URLParam> stringParam = new ConcurrentHashMap<>();
private ScheduledExecutorService cacheRemovalScheduler;
private int cacheRemovalTaskIntervalInMillis;
private int cacheClearWaitingThresholdInMillis;
private Map<ServiceAddressURL, Long> waitForRemove = new ConcurrentHashMap<>();
private Semaphore semaphore = new Semaphore(1);
private final Map<String, String> extraParameters;
protected final Map<URL, Map<String, ServiceAddressURL>> stringUrls = new ConcurrentHashMap<>();
protected CacheableFailbackRegistry(URL url) {
super(url);
extraParameters = new HashMap<>(8);
extraParameters.put(CHECK_KEY, String.valueOf(false));
cacheRemovalScheduler = url.getOrDefaultFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class).nextScheduledExecutor();
cacheRemovalTaskIntervalInMillis = getIntConfig(url.getScopeModel(), CACHE_CLEAR_TASK_INTERVAL, 2 * 60 * 1000);
cacheClearWaitingThresholdInMillis = getIntConfig(url.getScopeModel(), CACHE_CLEAR_WAITING_THRESHOLD, 5 * 60 * 1000);
}
protected static int getIntConfig(ScopeModel scopeModel, String key, int def) {
String str = ConfigurationUtils.getProperty(scopeModel, key);
int result = def;
if (StringUtils.isNotEmpty(str)) {
try {
result = Integer.parseInt(str);
} catch (NumberFormatException e) {
// 0-2 Property type mismatch.
logger.warn("0-2", "typo in property value", "This property requires an integer value.",
"Invalid registry properties configuration key " + key + ", value " + str);
}
}
return result;
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
this.evictURLCache(url);
}
protected void evictURLCache(URL url) {
Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
try {
if (oldURLs != null && !oldURLs.isEmpty()) {
logger.info("Evicting urls for service " + url.getServiceKey() + ", size " + oldURLs.size());
Long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, ServiceAddressURL> entry : oldURLs.entrySet()) {
waitForRemove.put(entry.getValue(), currentTimestamp);
}
if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
if (semaphore.tryAcquire()) {
cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
}
}
}
} catch (Exception e) {
// It seems that the most possible statement that causes exception is the 'schedule()' method.
// The executor that FrameworkExecutorRepository.nextScheduledExecutor() method returns
// is made by Executors.newSingleThreadScheduledExecutor().
// After observing the code of ScheduledThreadPoolExecutor.delayedExecute,
// it seems that it only throws RejectedExecutionException when the thread pool is shutdown.
// When? FrameworkExecutorRepository gets destroyed.
// 1-3: URL evicting failed.
logger.warn("1-3", "thread pool getting destroyed", "",
"Failed to evict url for " + url.getServiceKey(), e);
}
}
protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) {
// keep old urls
Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer);
// create new urls
Map<String, ServiceAddressURL> newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
// remove 'release', 'dubbo', 'methods', timestamp, 'dubbo.tag' parameter
// in consumer URL.
URL copyOfConsumer = removeParamsFromConsumer(consumer);
if (oldURLs == null) {
for (String rawProvider : providers) {
// remove timestamp in provider url.
rawProvider = stripOffVariableKeys(rawProvider);
// create DubboServiceAddress object using provider url, consumer url, and extra parameters.
ServiceAddressURL cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
if (cachedURL == null) {
// 1-1: Address invalid.
logger.warn("1-1", "mismatch of service group and version settings", "",
"Invalid address, failed to parse into URL " + rawProvider);
continue;
}
newURLs.put(rawProvider, cachedURL);
}
} else {
// maybe only default, or "env" + default
for (String rawProvider : providers) {
rawProvider = stripOffVariableKeys(rawProvider);
ServiceAddressURL cachedURL = oldURLs.remove(rawProvider);
if (cachedURL == null) {
cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
if (cachedURL == null) {
logger.warn("1-1", "mismatch of service group and version settings", "",
"Invalid address, failed to parse into URL " + rawProvider);
continue;
}
}
newURLs.put(rawProvider, cachedURL);
}
}
evictURLCache(consumer);
stringUrls.put(consumer, newURLs);
return new ArrayList<>(newURLs.values());
}
protected List<URL> toUrlsWithEmpty(URL consumer, String path, Collection<String> providers) {
List<URL> urls = new ArrayList<>(1);
boolean isProviderPath = path.endsWith(PROVIDERS_CATEGORY);
if (isProviderPath) {
if (CollectionUtils.isNotEmpty(providers)) {
urls = toUrlsWithoutEmpty(consumer, providers);
} else {
// clear cache on empty notification: unsubscribe or provider offline
evictURLCache(consumer);
}
} else {
if (CollectionUtils.isNotEmpty(providers)) {
urls = toConfiguratorsWithoutEmpty(consumer, providers);
}
}
if (urls.isEmpty()) {
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
if (!PROVIDERS_CATEGORY.equals(category) || !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, true)) {
if (PROVIDERS_CATEGORY.equals(category)) {
logger.warn("1-4", "", "",
"Service " + consumer.getServiceKey() + " received empty address list and empty protection is disabled, will clear current available addresses");
}
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
}
return urls;
}
/**
* Create DubboServiceAddress object using provider url, consumer url, and extra parameters.
*
* @param rawProvider provider url string
* @param consumerURL URL object of consumer
* @param extraParameters extra parameters
* @return created DubboServiceAddressURL object
*/
protected ServiceAddressURL createURL(String rawProvider, URL consumerURL, Map<String, String> extraParameters) {
boolean encoded = true;
// use encoded value directly to avoid URLDecoder.decode allocation.
int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK);
if (paramStartIdx == -1) {
// if ENCODED_QUESTION_MARK does not show, mark as not encoded.
encoded = false;
}
// split by (encoded) question mark.
// part[0] => protocol + ip address + interface.
// part[1] => parameters (metadata).
String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx);
if (parts.length <= 1) {
// 1-5 Received URL without any parameters.
logger.warn("1-5", "", "",
"Received url without any parameters " + rawProvider);
return DubboServiceAddressURL.valueOf(rawProvider, consumerURL);
}
String rawAddress = parts[0];
String rawParams = parts[1];
// Workaround for 'effectively final': duplicate the encoded variable.
boolean isEncoded = encoded;
// PathURLAddress if it's using dubbo protocol.
URLAddress address = stringAddress.computeIfAbsent(rawAddress, k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded));
address.setTimestamp(System.currentTimeMillis());
URLParam param = stringParam.computeIfAbsent(rawParams, k -> URLParam.parse(k, isEncoded, extraParameters));
param.setTimestamp(System.currentTimeMillis());
// create service URL using cached address, param, and consumer URL.
ServiceAddressURL cachedServiceAddressURL = createServiceURL(address, param, consumerURL);
if (isMatch(consumerURL, cachedServiceAddressURL)) {
return cachedServiceAddressURL;
}
return null;
}
protected ServiceAddressURL createServiceURL(URLAddress address, URLParam param, URL consumerURL) {
return new DubboServiceAddressURL(address, param, consumerURL, null);
}
protected URL removeParamsFromConsumer(URL consumer) {
Set<ProviderFirstParams> providerFirstParams = consumer.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances();
if (CollectionUtils.isEmpty(providerFirstParams)) {
return consumer;
}
for (ProviderFirstParams paramsFilter : providerFirstParams) {
consumer = consumer.removeParameters(paramsFilter.params());
}
return consumer;
}
private String stripOffVariableKeys(String rawProvider) {
String[] keys = getVariableKeys();
if (keys == null || keys.length == 0) {
return rawProvider;
}
for (String key : keys) {
int idxStart = rawProvider.indexOf(key);
if (idxStart == -1) {
continue;
}
int idxEnd = rawProvider.indexOf(ENCODED_AND_MARK, idxStart);
String part1 = rawProvider.substring(0, idxStart);
if (idxEnd == -1) {
rawProvider = part1;
} else {
String part2 = rawProvider.substring(idxEnd + ENCODED_AND_MARK.length());
rawProvider = part1 + part2;
}
}
if (rawProvider.endsWith(ENCODED_AND_MARK)) {
rawProvider = rawProvider.substring(0, rawProvider.length() - ENCODED_AND_MARK.length());
}
if (rawProvider.endsWith(ENCODED_QUESTION_MARK)) {
rawProvider = rawProvider.substring(0, rawProvider.length() - ENCODED_QUESTION_MARK.length());
}
return rawProvider;
}
private List<URL> toConfiguratorsWithoutEmpty(URL consumer, Collection<String> configurators) {
List<URL> urls = new ArrayList<>();
if (CollectionUtils.isNotEmpty(configurators)) {
for (String provider : configurators) {
if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
URL url = URLStrParser.parseEncodedStr(provider);
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
protected Map<String, String> getExtraParameters() {
return extraParameters;
}
protected String[] getVariableKeys() {
return VARIABLE_KEYS;
}
protected String getDefaultURLProtocol() {
return DUBBO;
}
/**
* This method is for unit test to see if the RemovalTask has completed or not.<br />
* <strong>Please do not call this method in other places.</strong>
*/
@Deprecated
protected Semaphore getSemaphore() {
return semaphore;
}
protected abstract boolean isMatch(URL subscribeUrl, URL providerUrl);
/**
* The cached URL removal task, which will be run on a scheduled thread pool. (It will be run after a delay.)
*/
private class RemovalTask implements Runnable {
@Override
public void run() {
logger.info("Clearing cached URLs, waiting to clear size " + waitForRemove.size());
int clearCount = 0;
try {
Iterator<Map.Entry<ServiceAddressURL, Long>> it = waitForRemove.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ServiceAddressURL, Long> entry = it.next();
ServiceAddressURL removeURL = entry.getKey();
long removeTime = entry.getValue();
long current = System.currentTimeMillis();
if (current - removeTime >= cacheClearWaitingThresholdInMillis) {
URLAddress urlAddress = removeURL.getUrlAddress();
URLParam urlParam = removeURL.getUrlParam();
if (current - urlAddress.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
stringAddress.remove(urlAddress.getRawAddress());
}
if (current - urlParam.getTimestamp() >= cacheClearWaitingThresholdInMillis) {
stringParam.remove(urlParam.getRawParam());
}
it.remove();
clearCount++;
}
}
} catch (Throwable t) {
// 1-6 Error when clearing cached URLs.
logger.error("1-6", "", "",
"Error occurred when clearing cached URLs", t);
} finally {
semaphore.release();
}
logger.info("Clear cached URLs, size " + clearCount);
if (CollectionUtils.isNotEmptyMap(waitForRemove)) {
// move to next schedule
if (semaphore.tryAcquire()) {
cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS);
}
}
}
}
}