blob: dc4d1c7b6f7f18e9ee38babfe3d4a5901420732d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.of;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
public abstract class AbstractServiceNameMapping implements ServiceNameMapping {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected ApplicationModel applicationModel;
private final MappingCacheManager mappingCacheManager;
private final Map<String, Set<MappingListener>> mappingListeners = new ConcurrentHashMap<>();
// mapping lock is shared among registries of the same application.
private final ConcurrentMap<String, ReentrantLock> mappingLocks = new ConcurrentHashMap<>();
// TODO, check how should this be cleared once a reference or interface is destroyed to avoid key accumulation
private final Map<String, Boolean> mappingInitStatus = new HashMap<>();
public AbstractServiceNameMapping(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
boolean enableFileCache = true;
Optional<ApplicationConfig> application = applicationModel.getApplicationConfigManager().getApplication();
if(application.isPresent()) {
enableFileCache = Boolean.TRUE.equals(application.get().getEnableFileCache()) ? true : false;
}
this.mappingCacheManager = new MappingCacheManager(enableFileCache,
applicationModel.tryGetApplicationName(),
applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
}
// just for test
public void setApplicationModel(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
}
/**
* Get the service names from the specified Dubbo service interface, group, version and protocol
*
* @return
*/
abstract public Set<String> get(URL url);
/**
* Get the service names from the specified Dubbo service interface, group, version and protocol
*
* @return
*/
abstract public Set<String> getAndListen(URL url, MappingListener mappingListener);
abstract protected void removeListener(URL url, MappingListener mappingListener);
@Override
public synchronized void initInterfaceAppMapping(URL subscribedURL) {
String key = ServiceNameMapping.buildMappingKey(subscribedURL);
if (hasInitiated(key)) {
return;
}
mappingInitStatus.put(key, Boolean.TRUE);
Set<String> subscribedServices = new TreeSet<>();
String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
if (StringUtils.isNotEmpty(serviceNames)) {
logger.info(key + " mapping to " + serviceNames + " instructed by provided-by set by user.");
subscribedServices.addAll(parseServices(serviceNames));
}
if (isEmpty(subscribedServices)) {
Set<String> cachedServices = this.getCachedMapping(key);
if (!isEmpty(cachedServices)) {
logger.info(key + " mapping to " + serviceNames + " instructed by local cache.");
subscribedServices.addAll(cachedServices);
}
} else {
this.putCachedMappingIfAbsent(key, subscribedServices);
}
}
@Override
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
String key = ServiceNameMapping.buildMappingKey(subscribedURL);
// use previously cached services.
Set<String> mappingServices = this.getCachedMapping(key);
// Asynchronously register listener in case previous cache does not exist or cache expired.
if (CollectionUtils.isEmpty(mappingServices)) {
try {
logger.info("Local cache mapping is empty");
mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();
} catch (Exception e) {
// ignore
}
if (CollectionUtils.isEmpty(mappingServices)) {
String registryServices = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);
if (StringUtils.isNotEmpty(registryServices)) {
logger.info(subscribedURL.getServiceInterface() + " mapping to " + registryServices + " instructed by registry subscribed-services.");
mappingServices = parseServices(registryServices);
}
}
if (CollectionUtils.isNotEmpty(mappingServices)) {
this.putCachedMapping(key, mappingServices);
}
} else {
ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
}
return mappingServices;
}
@Override
public MappingListener stopListen(URL subscribeURL, MappingListener listener) {
synchronized (mappingListeners) {
String mappingKey = ServiceNameMapping.buildMappingKey(subscribeURL);
Set<MappingListener> listeners = mappingListeners.get(mappingKey);
//todo, remove listener from remote metadata center
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.remove(listener);
listener.stop();
removeListener(subscribeURL, listener);
}
if (CollectionUtils.isEmpty(listeners)) {
mappingListeners.remove(mappingKey);
removeCachedMapping(mappingKey);
removeMappingLock(mappingKey);
}
return listener;
}
}
static Set<String> parseServices(String literalServices) {
return isBlank(literalServices) ? emptySet() :
unmodifiableSet(new TreeSet<>(of(literalServices.split(","))
.map(String::trim)
.filter(StringUtils::isNotEmpty)
.collect(toSet())));
}
@Override
public void putCachedMapping(String serviceKey, Set<String> apps) {
mappingCacheManager.put(serviceKey, toTreeSet(apps));
}
protected void putCachedMappingIfAbsent(String serviceKey, Set<String> apps) {
Lock lock = getMappingLock(serviceKey);
try {
lock.lock();
if (CollectionUtils.isEmpty(mappingCacheManager.get(serviceKey))) {
mappingCacheManager.put(serviceKey, toTreeSet(apps));
}
} finally {
lock.unlock();
}
}
@Override
public Set<String> getCachedMapping(String mappingKey) {
return mappingCacheManager.get(mappingKey);
}
@Override
public Set<String> getCachedMapping(URL consumerURL) {
return getCachedMapping(ServiceNameMapping.buildMappingKey(consumerURL));
}
@Override
public Set<String> getRemoteMapping(URL consumerURL) {
return get(consumerURL);
}
@Override
public Set<String> removeCachedMapping(String serviceKey) {
return mappingCacheManager.remove(serviceKey);
}
@Override
public Map<String, Set<String>> getCachedMapping() {
return Collections.unmodifiableMap(mappingCacheManager.getAll());
}
public Lock getMappingLock(String key) {
return mappingLocks.computeIfAbsent(key, _k -> new ReentrantLock());
}
protected void removeMappingLock(String key) {
Lock lock = mappingLocks.get(key);
if (lock != null) {
try {
lock.lock();
mappingLocks.remove(key);
} finally {
lock.unlock();
}
}
}
private boolean hasInitiated(String key) {
Lock lock = getMappingLock(key);
try {
lock.lock();
return mappingInitStatus.computeIfAbsent(key, _k -> Boolean.FALSE);
} finally {
lock.unlock();
}
}
@Override
public void $destroy() {
mappingCacheManager.destroy();
mappingListeners.clear();
mappingLocks.clear();
mappingInitStatus.clear();
}
private class AsyncMappingTask implements Callable<Set<String>> {
private final MappingListener listener;
private final URL subscribedURL;
private final boolean notifyAtFirstTime;
public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
this.listener = listener;
this.subscribedURL = subscribedURL;
this.notifyAtFirstTime = notifyAtFirstTime;
}
@Override
public Set<String> call() throws Exception {
synchronized (mappingListeners) {
Set<String> mappedServices = emptySet();
try {
String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
if (listener != null) {
mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
listeners.add(listener);
if (CollectionUtils.isNotEmpty(mappedServices)) {
if (notifyAtFirstTime) {
// guarantee at-least-once notification no matter what kind of underlying meta server is used.
// listener notification will also cause updating of mapping cache.
listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
}
}
} else {
mappedServices = get(subscribedURL);
if (CollectionUtils.isNotEmpty(mappedServices)) {
AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
}
}
} catch (Exception e) {
logger.error("Failed getting mapping info from remote center. ", e);
}
return mappedServices;
}
}
}
}