/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.dubbo.registry.support; | |
import org.apache.dubbo.common.URL; | |
import org.apache.dubbo.common.logger.Logger; | |
import org.apache.dubbo.common.logger.LoggerFactory; | |
import org.apache.dubbo.common.utils.CollectionUtils; | |
import org.apache.dubbo.common.utils.ConcurrentHashSet; | |
import org.apache.dubbo.common.utils.ConfigUtils; | |
import org.apache.dubbo.common.utils.NamedThreadFactory; | |
import org.apache.dubbo.common.utils.StringUtils; | |
import org.apache.dubbo.common.utils.UrlUtils; | |
import org.apache.dubbo.registry.NotifyListener; | |
import org.apache.dubbo.registry.Registry; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.RandomAccessFile; | |
import java.nio.channels.FileChannel; | |
import java.nio.channels.FileLock; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; | |
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; | |
import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_SNAPSHOT_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.ACCEPTS_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; | |
import static org.apache.dubbo.registry.Constants.REGISTRY_FILESAVE_SYNC_KEY; | |
import static org.apache.dubbo.registry.Constants.REGISTRY__LOCAL_FILE_CACHE_ENABLED; | |
/** | |
* AbstractRegistry. (SPI, Prototype, ThreadSafe) | |
*/ | |
public abstract class AbstractRegistry implements Registry { | |
// URL address separator, used in file cache, service provider URL separation | |
private static final char URL_SEPARATOR = ' '; | |
// URL address separated regular expression for parsing the service provider URL list in the file cache | |
private static final String URL_SPLIT = "\\s+"; | |
// Max times to retry to save properties to local cache file | |
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; | |
// Log output | |
protected final Logger logger = LoggerFactory.getLogger(getClass()); | |
// Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers | |
private final Properties properties = new Properties(); | |
// File cache timing writing | |
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); | |
// Is it synchronized to save the file | |
private boolean syncSaveFile; | |
private final AtomicLong lastCacheChanged = new AtomicLong(); | |
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); | |
private final Set<URL> registered = new ConcurrentHashSet<>(); | |
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>(); | |
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>(); | |
private URL registryUrl; | |
// Local disk cache file | |
private File file; | |
public AbstractRegistry(URL url) { | |
setUrl(url); | |
if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) { | |
// Start file save timer | |
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false); | |
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; | |
String filename = url.getParameter(FILE_KEY, defaultFilename); | |
File file = null; | |
if (ConfigUtils.isNotEmpty(filename)) { | |
file = new File(filename); | |
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { | |
if (!file.getParentFile().mkdirs()) { | |
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); | |
} | |
} | |
} | |
this.file = file; | |
// When starting the subscription center, | |
// we need to read the local cache file for future Registry fault tolerance processing. | |
loadProperties(); | |
notify(url.getBackupUrls()); | |
} | |
} | |
protected static List<URL> filterEmpty(URL url, List<URL> urls) { | |
if (CollectionUtils.isEmpty(urls)) { | |
List<URL> result = new ArrayList<>(1); | |
result.add(url.setProtocol(EMPTY_PROTOCOL)); | |
return result; | |
} | |
return urls; | |
} | |
@Override | |
public URL getUrl() { | |
return registryUrl; | |
} | |
protected void setUrl(URL url) { | |
if (url == null) { | |
throw new IllegalArgumentException("registry url == null"); | |
} | |
this.registryUrl = url; | |
} | |
public Set<URL> getRegistered() { | |
return Collections.unmodifiableSet(registered); | |
} | |
public Map<URL, Set<NotifyListener>> getSubscribed() { | |
return Collections.unmodifiableMap(subscribed); | |
} | |
public Map<URL, Map<String, List<URL>>> getNotified() { | |
return Collections.unmodifiableMap(notified); | |
} | |
public File getCacheFile() { | |
return file; | |
} | |
public Properties getCacheProperties() { | |
return properties; | |
} | |
public AtomicLong getLastCacheChanged() { | |
return lastCacheChanged; | |
} | |
public void doSaveProperties(long version) { | |
if (version < lastCacheChanged.get()) { | |
return; | |
} | |
if (file == null) { | |
return; | |
} | |
// Save | |
try { | |
File lockfile = new File(file.getAbsolutePath() + ".lock"); | |
if (!lockfile.exists()) { | |
lockfile.createNewFile(); | |
} | |
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); | |
FileChannel channel = raf.getChannel()) { | |
FileLock lock = channel.tryLock(); | |
if (lock == null) { | |
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); | |
} | |
// Save | |
try { | |
if (!file.exists()) { | |
file.createNewFile(); | |
} | |
try (FileOutputStream outputFile = new FileOutputStream(file)) { | |
properties.store(outputFile, "Dubbo Registry Cache"); | |
} | |
} finally { | |
lock.release(); | |
} | |
} | |
} catch (Throwable e) { | |
savePropertiesRetryTimes.incrementAndGet(); | |
if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) { | |
logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e); | |
savePropertiesRetryTimes.set(0); | |
return; | |
} | |
if (version < lastCacheChanged.get()) { | |
savePropertiesRetryTimes.set(0); | |
return; | |
} else { | |
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); | |
} | |
logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e); | |
} | |
} | |
private void loadProperties() { | |
if (file != null && file.exists()) { | |
InputStream in = null; | |
try { | |
in = new FileInputStream(file); | |
properties.load(in); | |
if (logger.isInfoEnabled()) { | |
logger.info("Load registry cache file " + file + ", data: " + properties); | |
} | |
} catch (Throwable e) { | |
logger.warn("Failed to load registry cache file " + file, e); | |
} finally { | |
if (in != null) { | |
try { | |
in.close(); | |
} catch (IOException e) { | |
logger.warn(e.getMessage(), e); | |
} | |
} | |
} | |
} | |
} | |
public List<URL> getCacheUrls(URL url) { | |
for (Map.Entry<Object, Object> entry : properties.entrySet()) { | |
String key = (String) entry.getKey(); | |
String value = (String) entry.getValue(); | |
if (StringUtils.isNotEmpty(key) && key.equals(url.getServiceKey()) | |
&& (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') | |
&& StringUtils.isNotEmpty(value)) { | |
String[] arr = value.trim().split(URL_SPLIT); | |
List<URL> urls = new ArrayList<>(); | |
for (String u : arr) { | |
urls.add(URL.valueOf(u)); | |
} | |
return urls; | |
} | |
} | |
return null; | |
} | |
@Override | |
public List<URL> lookup(URL url) { | |
List<URL> result = new ArrayList<>(); | |
Map<String, List<URL>> notifiedUrls = getNotified().get(url); | |
if (CollectionUtils.isNotEmptyMap(notifiedUrls)) { | |
for (List<URL> urls : notifiedUrls.values()) { | |
for (URL u : urls) { | |
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { | |
result.add(u); | |
} | |
} | |
} | |
} else { | |
final AtomicReference<List<URL>> reference = new AtomicReference<>(); | |
NotifyListener listener = reference::set; | |
subscribe(url, listener); // Subscribe logic guarantees the first notify to return | |
List<URL> urls = reference.get(); | |
if (CollectionUtils.isNotEmpty(urls)) { | |
for (URL u : urls) { | |
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { | |
result.add(u); | |
} | |
} | |
} | |
} | |
return result; | |
} | |
@Override | |
public void register(URL url) { | |
if (url == null) { | |
throw new IllegalArgumentException("register url == null"); | |
} | |
if (logger.isInfoEnabled()) { | |
logger.info("Register: " + url); | |
} | |
registered.add(url); | |
} | |
@Override | |
public void unregister(URL url) { | |
if (url == null) { | |
throw new IllegalArgumentException("unregister url == null"); | |
} | |
if (logger.isInfoEnabled()) { | |
logger.info("Unregister: " + url); | |
} | |
registered.remove(url); | |
} | |
@Override | |
public void subscribe(URL url, NotifyListener listener) { | |
if (url == null) { | |
throw new IllegalArgumentException("subscribe url == null"); | |
} | |
if (listener == null) { | |
throw new IllegalArgumentException("subscribe listener == null"); | |
} | |
if (logger.isInfoEnabled()) { | |
logger.info("Subscribe: " + url); | |
} | |
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); | |
listeners.add(listener); | |
} | |
@Override | |
public void unsubscribe(URL url, NotifyListener listener) { | |
if (url == null) { | |
throw new IllegalArgumentException("unsubscribe url == null"); | |
} | |
if (listener == null) { | |
throw new IllegalArgumentException("unsubscribe listener == null"); | |
} | |
if (logger.isInfoEnabled()) { | |
logger.info("Unsubscribe: " + url); | |
} | |
Set<NotifyListener> listeners = subscribed.get(url); | |
if (listeners != null) { | |
listeners.remove(listener); | |
} | |
} | |
protected void recover() throws Exception { | |
// register | |
Set<URL> recoverRegistered = new HashSet<>(getRegistered()); | |
if (!recoverRegistered.isEmpty()) { | |
if (logger.isInfoEnabled()) { | |
logger.info("Recover register url " + recoverRegistered); | |
} | |
for (URL url : recoverRegistered) { | |
register(url); | |
} | |
} | |
// subscribe | |
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<>(getSubscribed()); | |
if (!recoverSubscribed.isEmpty()) { | |
if (logger.isInfoEnabled()) { | |
logger.info("Recover subscribe url " + recoverSubscribed.keySet()); | |
} | |
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { | |
URL url = entry.getKey(); | |
for (NotifyListener listener : entry.getValue()) { | |
subscribe(url, listener); | |
} | |
} | |
} | |
} | |
protected void notify(List<URL> urls) { | |
if (CollectionUtils.isEmpty(urls)) { | |
return; | |
} | |
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { | |
URL url = entry.getKey(); | |
if (!UrlUtils.isMatch(url, urls.get(0))) { | |
continue; | |
} | |
Set<NotifyListener> listeners = entry.getValue(); | |
if (listeners != null) { | |
for (NotifyListener listener : listeners) { | |
try { | |
notify(url, listener, filterEmpty(url, urls)); | |
} catch (Throwable t) { | |
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Notify changes from the Provider side. | |
* | |
* @param url consumer side url | |
* @param listener listener | |
* @param urls provider latest urls | |
*/ | |
protected void notify(URL url, NotifyListener listener, List<URL> urls) { | |
if (url == null) { | |
throw new IllegalArgumentException("notify url == null"); | |
} | |
if (listener == null) { | |
throw new IllegalArgumentException("notify listener == null"); | |
} | |
if ((CollectionUtils.isEmpty(urls)) | |
&& !ANY_VALUE.equals(url.getServiceInterface())) { | |
logger.warn("Ignore empty notify urls for subscribe url " + url); | |
return; | |
} | |
if (logger.isInfoEnabled()) { | |
logger.info("Notify urls for subscribing service " + url.getServiceKey() + ", provider url size: " + urls.size()); | |
} | |
// keep every provider's category. | |
Map<String, List<URL>> result = new HashMap<>(); | |
for (URL u : urls) { | |
if (UrlUtils.isMatch(url, u)) { | |
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); | |
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); | |
categoryList.add(u); | |
} | |
} | |
if (result.size() == 0) { | |
return; | |
} | |
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); | |
for (Map.Entry<String, List<URL>> entry : result.entrySet()) { | |
String category = entry.getKey(); | |
List<URL> categoryList = entry.getValue(); | |
categoryNotified.put(category, categoryList); | |
listener.notify(categoryList); | |
// We will update our cache file after each notification. | |
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. | |
if (registryUrl.getParameter(REGISTRY_SNAPSHOT_KEY, false)) { | |
saveProperties(url); | |
} | |
} | |
} | |
private void saveProperties(URL url) { | |
if (file == null) { | |
return; | |
} | |
try { | |
StringBuilder buf = new StringBuilder(); | |
Map<String, List<URL>> categoryNotified = notified.get(url); | |
if (categoryNotified != null) { | |
for (List<URL> us : categoryNotified.values()) { | |
for (URL u : us) { | |
if (buf.length() > 0) { | |
buf.append(URL_SEPARATOR); | |
} | |
buf.append(u.toFullString()); | |
} | |
} | |
} | |
properties.setProperty(url.getServiceKey(), buf.toString()); | |
long version = lastCacheChanged.incrementAndGet(); | |
if (syncSaveFile) { | |
doSaveProperties(version); | |
} else { | |
registryCacheExecutor.execute(new SaveProperties(version)); | |
} | |
} catch (Throwable t) { | |
logger.warn(t.getMessage(), t); | |
} | |
} | |
@Override | |
public void destroy() { | |
if (logger.isInfoEnabled()) { | |
logger.info("Destroy registry:" + getUrl()); | |
} | |
Set<URL> destroyRegistered = new HashSet<>(getRegistered()); | |
if (!destroyRegistered.isEmpty()) { | |
for (URL url : new HashSet<>(getRegistered())) { | |
if (url.getParameter(DYNAMIC_KEY, true)) { | |
try { | |
unregister(url); | |
if (logger.isInfoEnabled()) { | |
logger.info("Destroy unregister url " + url); | |
} | |
} catch (Throwable t) { | |
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); | |
} | |
} | |
} | |
} | |
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); | |
if (!destroySubscribed.isEmpty()) { | |
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { | |
URL url = entry.getKey(); | |
for (NotifyListener listener : entry.getValue()) { | |
try { | |
unsubscribe(url, listener); | |
if (logger.isInfoEnabled()) { | |
logger.info("Destroy unsubscribe url " + url); | |
} | |
} catch (Throwable t) { | |
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); | |
} | |
} | |
} | |
} | |
AbstractRegistryFactory.removeDestroyedRegistry(this); | |
} | |
protected boolean acceptable(URL urlToRegistry) { | |
String pattern = registryUrl.getParameter(ACCEPTS_KEY); | |
if (StringUtils.isEmpty(pattern)) { | |
return true; | |
} | |
return Arrays.stream(COMMA_SPLIT_PATTERN.split(pattern)) | |
.anyMatch(p -> p.equalsIgnoreCase(urlToRegistry.getProtocol())); | |
} | |
@Override | |
public String toString() { | |
return getUrl().toString(); | |
} | |
private class SaveProperties implements Runnable { | |
private long version; | |
private SaveProperties(long version) { | |
this.version = version; | |
} | |
@Override | |
public void run() { | |
doSaveProperties(version); | |
} | |
} | |
} |