| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.support; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.URLBuilder; |
| import org.apache.dubbo.common.URLStrParser; |
| import org.apache.dubbo.common.config.ConfigurationUtils; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; |
| import org.apache.dubbo.common.url.component.DubboServiceAddressURL; |
| import org.apache.dubbo.common.url.component.ServiceAddressURL; |
| import org.apache.dubbo.common.url.component.URLAddress; |
| import org.apache.dubbo.common.url.component.URLParam; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.StringUtils; |
| import org.apache.dubbo.common.utils.UrlUtils; |
| import org.apache.dubbo.registry.NotifyListener; |
| import org.apache.dubbo.registry.ProviderFirstParams; |
| import org.apache.dubbo.rpc.model.ScopeModel; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.dubbo.common.URLStrParser.ENCODED_AND_MARK; |
| import static org.apache.dubbo.common.URLStrParser.ENCODED_PID_KEY; |
| import static org.apache.dubbo.common.URLStrParser.ENCODED_QUESTION_MARK; |
| import static org.apache.dubbo.common.URLStrParser.ENCODED_TIMESTAMP_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.CACHE_CLEAR_TASK_INTERVAL; |
| import static org.apache.dubbo.common.constants.CommonConstants.CACHE_CLEAR_WAITING_THRESHOLD; |
| import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.DUBBO; |
| import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR; |
| import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_SEPARATOR_ENCODED; |
| import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; |
| import static org.apache.dubbo.common.constants.RegistryConstants.ENABLE_EMPTY_PROTECTION_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY; |
| |
| /** |
| * <p> |
| * Based on FailbackRegistry, it adds a URLAddress and URLParam cache to save RAM space. |
| * |
| * <p> |
| * It's useful for registries whose sdk returns raw string as provider instance. For example, Zookeeper and etcd. |
| * |
| * @see org.apache.dubbo.registry.support.FailbackRegistry |
| * @see org.apache.dubbo.registry.support.AbstractRegistry |
| */ |
| public abstract class CacheableFailbackRegistry extends FailbackRegistry { |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(CacheableFailbackRegistry.class); |
| |
| private static String[] VARIABLE_KEYS = new String[]{ENCODED_TIMESTAMP_KEY, ENCODED_PID_KEY}; |
| |
| protected Map<String, URLAddress> stringAddress = new ConcurrentHashMap<>(); |
| protected Map<String, URLParam> stringParam = new ConcurrentHashMap<>(); |
| |
| private ScheduledExecutorService cacheRemovalScheduler; |
| private int cacheRemovalTaskIntervalInMillis; |
| private int cacheClearWaitingThresholdInMillis; |
| |
| private Map<ServiceAddressURL, Long> waitForRemove = new ConcurrentHashMap<>(); |
| private Semaphore semaphore = new Semaphore(1); |
| |
| private final Map<String, String> extraParameters; |
| protected final Map<URL, Map<String, ServiceAddressURL>> stringUrls = new ConcurrentHashMap<>(); |
| |
| protected CacheableFailbackRegistry(URL url) { |
| super(url); |
| extraParameters = new HashMap<>(8); |
| extraParameters.put(CHECK_KEY, String.valueOf(false)); |
| |
| cacheRemovalScheduler = url.getOrDefaultFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class).nextScheduledExecutor(); |
| cacheRemovalTaskIntervalInMillis = getIntConfig(url.getScopeModel(), CACHE_CLEAR_TASK_INTERVAL, 2 * 60 * 1000); |
| cacheClearWaitingThresholdInMillis = getIntConfig(url.getScopeModel(), CACHE_CLEAR_WAITING_THRESHOLD, 5 * 60 * 1000); |
| } |
| |
| protected static int getIntConfig(ScopeModel scopeModel, String key, int def) { |
| String str = ConfigurationUtils.getProperty(scopeModel, key); |
| int result = def; |
| if (StringUtils.isNotEmpty(str)) { |
| try { |
| result = Integer.parseInt(str); |
| } catch (NumberFormatException e) { |
| // 0-2 Property type mismatch. |
| |
| logger.warn("0-2", "typo in property value", "This property requires an integer value.", |
| "Invalid registry properties configuration key " + key + ", value " + str); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public void doUnsubscribe(URL url, NotifyListener listener) { |
| this.evictURLCache(url); |
| } |
| |
| protected void evictURLCache(URL url) { |
| Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url); |
| try { |
| if (oldURLs != null && !oldURLs.isEmpty()) { |
| logger.info("Evicting urls for service " + url.getServiceKey() + ", size " + oldURLs.size()); |
| Long currentTimestamp = System.currentTimeMillis(); |
| for (Map.Entry<String, ServiceAddressURL> entry : oldURLs.entrySet()) { |
| waitForRemove.put(entry.getValue(), currentTimestamp); |
| } |
| if (CollectionUtils.isNotEmptyMap(waitForRemove)) { |
| if (semaphore.tryAcquire()) { |
| cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS); |
| } |
| } |
| } |
| } catch (Exception e) { |
| // It seems that the most possible statement that causes exception is the 'schedule()' method. |
| |
| // The executor that FrameworkExecutorRepository.nextScheduledExecutor() method returns |
| // is made by Executors.newSingleThreadScheduledExecutor(). |
| |
| // After observing the code of ScheduledThreadPoolExecutor.delayedExecute, |
| // it seems that it only throws RejectedExecutionException when the thread pool is shutdown. |
| |
| // When? FrameworkExecutorRepository gets destroyed. |
| |
| // 1-3: URL evicting failed. |
| logger.warn("1-3", "thread pool getting destroyed", "", |
| "Failed to evict url for " + url.getServiceKey(), e); |
| } |
| } |
| |
| protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) { |
| // keep old urls |
| Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer); |
| |
| // create new urls |
| Map<String, ServiceAddressURL> newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1)); |
| |
| // remove 'release', 'dubbo', 'methods', timestamp, 'dubbo.tag' parameter |
| // in consumer URL. |
| URL copyOfConsumer = removeParamsFromConsumer(consumer); |
| |
| if (oldURLs == null) { |
| for (String rawProvider : providers) { |
| // remove timestamp in provider url. |
| rawProvider = stripOffVariableKeys(rawProvider); |
| |
| // create DubboServiceAddress object using provider url, consumer url, and extra parameters. |
| ServiceAddressURL cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters()); |
| if (cachedURL == null) { |
| // 1-1: Address invalid. |
| logger.warn("1-1", "mismatch of service group and version settings", "", |
| "Invalid address, failed to parse into URL " + rawProvider); |
| |
| continue; |
| } |
| newURLs.put(rawProvider, cachedURL); |
| } |
| } else { |
| // maybe only default, or "env" + default |
| for (String rawProvider : providers) { |
| rawProvider = stripOffVariableKeys(rawProvider); |
| ServiceAddressURL cachedURL = oldURLs.remove(rawProvider); |
| if (cachedURL == null) { |
| cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters()); |
| if (cachedURL == null) { |
| logger.warn("1-1", "mismatch of service group and version settings", "", |
| "Invalid address, failed to parse into URL " + rawProvider); |
| |
| continue; |
| } |
| } |
| newURLs.put(rawProvider, cachedURL); |
| } |
| } |
| |
| evictURLCache(consumer); |
| stringUrls.put(consumer, newURLs); |
| |
| return new ArrayList<>(newURLs.values()); |
| } |
| |
| protected List<URL> toUrlsWithEmpty(URL consumer, String path, Collection<String> providers) { |
| List<URL> urls = new ArrayList<>(1); |
| boolean isProviderPath = path.endsWith(PROVIDERS_CATEGORY); |
| |
| if (isProviderPath) { |
| if (CollectionUtils.isNotEmpty(providers)) { |
| urls = toUrlsWithoutEmpty(consumer, providers); |
| } else { |
| // clear cache on empty notification: unsubscribe or provider offline |
| evictURLCache(consumer); |
| } |
| } else { |
| if (CollectionUtils.isNotEmpty(providers)) { |
| urls = toConfiguratorsWithoutEmpty(consumer, providers); |
| } |
| } |
| |
| if (urls.isEmpty()) { |
| int i = path.lastIndexOf(PATH_SEPARATOR); |
| String category = i < 0 ? path : path.substring(i + 1); |
| if (!PROVIDERS_CATEGORY.equals(category) || !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, true)) { |
| if (PROVIDERS_CATEGORY.equals(category)) { |
| logger.warn("1-4", "", "", |
| "Service " + consumer.getServiceKey() + " received empty address list and empty protection is disabled, will clear current available addresses"); |
| } |
| URL empty = URLBuilder.from(consumer) |
| .setProtocol(EMPTY_PROTOCOL) |
| .addParameter(CATEGORY_KEY, category) |
| .build(); |
| urls.add(empty); |
| } |
| } |
| |
| return urls; |
| } |
| |
| /** |
| * Create DubboServiceAddress object using provider url, consumer url, and extra parameters. |
| * |
| * @param rawProvider provider url string |
| * @param consumerURL URL object of consumer |
| * @param extraParameters extra parameters |
| * @return created DubboServiceAddressURL object |
| */ |
| protected ServiceAddressURL createURL(String rawProvider, URL consumerURL, Map<String, String> extraParameters) { |
| |
| boolean encoded = true; |
| |
| // use encoded value directly to avoid URLDecoder.decode allocation. |
| int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK); |
| |
| if (paramStartIdx == -1) { |
| // if ENCODED_QUESTION_MARK does not show, mark as not encoded. |
| encoded = false; |
| } |
| |
| // split by (encoded) question mark. |
| // part[0] => protocol + ip address + interface. |
| // part[1] => parameters (metadata). |
| String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx); |
| |
| if (parts.length <= 1) { |
| // 1-5 Received URL without any parameters. |
| logger.warn("1-5", "", "", |
| "Received url without any parameters " + rawProvider); |
| |
| return DubboServiceAddressURL.valueOf(rawProvider, consumerURL); |
| } |
| |
| String rawAddress = parts[0]; |
| String rawParams = parts[1]; |
| |
| // Workaround for 'effectively final': duplicate the encoded variable. |
| boolean isEncoded = encoded; |
| |
| // PathURLAddress if it's using dubbo protocol. |
| URLAddress address = stringAddress.computeIfAbsent(rawAddress, k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded)); |
| address.setTimestamp(System.currentTimeMillis()); |
| |
| URLParam param = stringParam.computeIfAbsent(rawParams, k -> URLParam.parse(k, isEncoded, extraParameters)); |
| param.setTimestamp(System.currentTimeMillis()); |
| |
| // create service URL using cached address, param, and consumer URL. |
| ServiceAddressURL cachedServiceAddressURL = createServiceURL(address, param, consumerURL); |
| |
| if (isMatch(consumerURL, cachedServiceAddressURL)) { |
| return cachedServiceAddressURL; |
| } |
| |
| return null; |
| } |
| |
| |
| protected ServiceAddressURL createServiceURL(URLAddress address, URLParam param, URL consumerURL) { |
| return new DubboServiceAddressURL(address, param, consumerURL, null); |
| } |
| |
| protected URL removeParamsFromConsumer(URL consumer) { |
| Set<ProviderFirstParams> providerFirstParams = consumer.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances(); |
| if (CollectionUtils.isEmpty(providerFirstParams)) { |
| return consumer; |
| } |
| |
| for (ProviderFirstParams paramsFilter : providerFirstParams) { |
| consumer = consumer.removeParameters(paramsFilter.params()); |
| } |
| return consumer; |
| } |
| |
| private String stripOffVariableKeys(String rawProvider) { |
| String[] keys = getVariableKeys(); |
| if (keys == null || keys.length == 0) { |
| return rawProvider; |
| } |
| |
| for (String key : keys) { |
| int idxStart = rawProvider.indexOf(key); |
| if (idxStart == -1) { |
| continue; |
| } |
| int idxEnd = rawProvider.indexOf(ENCODED_AND_MARK, idxStart); |
| String part1 = rawProvider.substring(0, idxStart); |
| if (idxEnd == -1) { |
| rawProvider = part1; |
| } else { |
| String part2 = rawProvider.substring(idxEnd + ENCODED_AND_MARK.length()); |
| rawProvider = part1 + part2; |
| } |
| } |
| |
| if (rawProvider.endsWith(ENCODED_AND_MARK)) { |
| rawProvider = rawProvider.substring(0, rawProvider.length() - ENCODED_AND_MARK.length()); |
| } |
| if (rawProvider.endsWith(ENCODED_QUESTION_MARK)) { |
| rawProvider = rawProvider.substring(0, rawProvider.length() - ENCODED_QUESTION_MARK.length()); |
| } |
| |
| return rawProvider; |
| } |
| |
| private List<URL> toConfiguratorsWithoutEmpty(URL consumer, Collection<String> configurators) { |
| List<URL> urls = new ArrayList<>(); |
| if (CollectionUtils.isNotEmpty(configurators)) { |
| for (String provider : configurators) { |
| if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) { |
| URL url = URLStrParser.parseEncodedStr(provider); |
| if (UrlUtils.isMatch(consumer, url)) { |
| urls.add(url); |
| } |
| } |
| } |
| } |
| return urls; |
| } |
| |
| protected Map<String, String> getExtraParameters() { |
| return extraParameters; |
| } |
| |
| protected String[] getVariableKeys() { |
| return VARIABLE_KEYS; |
| } |
| |
| protected String getDefaultURLProtocol() { |
| return DUBBO; |
| } |
| |
| /** |
| * This method is for unit test to see if the RemovalTask has completed or not.<br /> |
| * <strong>Please do not call this method in other places.</strong> |
| */ |
| @Deprecated |
| protected Semaphore getSemaphore() { |
| return semaphore; |
| } |
| |
| protected abstract boolean isMatch(URL subscribeUrl, URL providerUrl); |
| |
| /** |
| * The cached URL removal task, which will be run on a scheduled thread pool. (It will be run after a delay.) |
| */ |
| private class RemovalTask implements Runnable { |
| @Override |
| public void run() { |
| logger.info("Clearing cached URLs, waiting to clear size " + waitForRemove.size()); |
| int clearCount = 0; |
| try { |
| Iterator<Map.Entry<ServiceAddressURL, Long>> it = waitForRemove.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<ServiceAddressURL, Long> entry = it.next(); |
| ServiceAddressURL removeURL = entry.getKey(); |
| long removeTime = entry.getValue(); |
| long current = System.currentTimeMillis(); |
| if (current - removeTime >= cacheClearWaitingThresholdInMillis) { |
| URLAddress urlAddress = removeURL.getUrlAddress(); |
| URLParam urlParam = removeURL.getUrlParam(); |
| if (current - urlAddress.getTimestamp() >= cacheClearWaitingThresholdInMillis) { |
| stringAddress.remove(urlAddress.getRawAddress()); |
| } |
| if (current - urlParam.getTimestamp() >= cacheClearWaitingThresholdInMillis) { |
| stringParam.remove(urlParam.getRawParam()); |
| } |
| it.remove(); |
| clearCount++; |
| } |
| } |
| } catch (Throwable t) { |
| // 1-6 Error when clearing cached URLs. |
| |
| logger.error("1-6", "", "", |
| "Error occurred when clearing cached URLs", t); |
| |
| } finally { |
| semaphore.release(); |
| } |
| logger.info("Clear cached URLs, size " + clearCount); |
| |
| if (CollectionUtils.isNotEmptyMap(waitForRemove)) { |
| // move to next schedule |
| if (semaphore.tryAcquire()) { |
| cacheRemovalScheduler.schedule(new RemovalTask(), cacheRemovalTaskIntervalInMillis, TimeUnit.MILLISECONDS); |
| } |
| } |
| } |
| } |
| } |