Fix recover method of ZookeeperServiceDiscovery (#10614)
* Fix recover method of ZookeeperServiceDiscovery
* Fix ut
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
index 534f000..dc4d1c7 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
@@ -24,7 +24,6 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
import java.util.Collections;
import java.util.HashMap;
@@ -50,7 +49,7 @@
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
-public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {
+public abstract class AbstractServiceNameMapping implements ServiceNameMapping {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected ApplicationModel applicationModel;
private final MappingCacheManager mappingCacheManager;
@@ -73,7 +72,7 @@
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
}
- @Override
+ // just for test
public void setApplicationModel(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
index 90d44b3..2365f6c 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
@@ -21,7 +21,6 @@
import org.apache.dubbo.rpc.model.ScopeModel;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -66,10 +65,4 @@
protected String getName() {
return "mapping";
}
-
- public void update(Map<String, Set<String>> newCache) {
- for (Map.Entry<String, Set<String>> entry : newCache.entrySet()) {
- cache.put(entry.getKey(), entry.getValue());
- }
- }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
index 4ede56e..0ccaa13 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
@@ -91,6 +91,9 @@
@Override
public synchronized void register() throws RuntimeException {
+ if (isDestroy) {
+ return;
+ }
this.serviceInstance = createServiceInstance(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
logger.warn("No valid instance found, stop registering instance address to registry.");
@@ -134,6 +137,9 @@
@Override
public synchronized void unregister() throws RuntimeException {
+ if (isDestroy) {
+ return;
+ }
// fixme, this metadata info might still being shared by other instances
// unReportMetadata(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index c65e519..da86143 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -40,6 +40,7 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -216,7 +217,7 @@
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
- FileChannel channel = raf.getChannel()) {
+ FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
if (lock == null) {
@@ -321,6 +322,12 @@
}
public List<URL> getCacheUrls(URL url) {
+ Map<String, List<URL>> categoryNotified = notified.get(url);
+ if (CollectionUtils.isNotEmptyMap(categoryNotified)) {
+ List<URL> urls = categoryNotified.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ return urls;
+ }
+
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
index f747a30..0a98a11 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -123,7 +123,7 @@
protected void evictURLCache(URL url) {
Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
try {
- if (oldURLs != null && !oldURLs.isEmpty()) {
+ if (CollectionUtils.isNotEmptyMap(oldURLs)) {
logger.info("Evicting urls for service " + url.getServiceKey() + ", size " + oldURLs.size());
Long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, ServiceAddressURL> entry : oldURLs.entrySet()) {
@@ -165,7 +165,7 @@
if (oldURLs == null) {
for (String rawProvider : providers) {
- // remove timestamp in provider url.
+ // remove VARIABLE_KEYS(timestamp,pid..) in provider url.
rawProvider = stripOffVariableKeys(rawProvider);
// create DubboServiceAddress object using provider url, consumer url, and extra parameters.
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 48b3005..d427cea 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -33,6 +33,7 @@
import org.apache.dubbo.rpc.RpcException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -117,6 +118,29 @@
@Override
public void destroy() {
super.destroy();
+
+ // remove child listener
+ Set<URL> urls = zkListeners.keySet();
+ for (URL url : urls) {
+ ConcurrentMap<NotifyListener, ChildListener> map = zkListeners.get(url);
+ if (CollectionUtils.isEmptyMap(map)) {
+ continue;
+ }
+ Collection<ChildListener> childListeners = map.values();
+ if (CollectionUtils.isEmpty(childListeners)) {
+ continue;
+ }
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ String root = toRootPath();
+ childListeners.stream().forEach(childListener -> zkClient.removeChildListener(root, childListener));
+ } else {
+ for (String path : toCategoriesPath(url)) {
+ childListeners.stream().forEach(childListener -> zkClient.removeChildListener(path, childListener));
+ }
+ }
+ }
+ zkListeners.clear();
+
// Just release zkClient reference, but can not close zk client here for zk client is shared somewhere else.
// See org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
zkClient = null;
@@ -225,6 +249,7 @@
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
+ super.doUnsubscribe(url, listener);
checkDestroyed();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 11276f4..9e2aea2 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -192,9 +192,9 @@
logger.error("Trying to recover from new zkClient session failed, path is " + path + ", error msg: " + e.getMessage());
}
- List<ServiceInstance> instances = this.getInstances(serviceName);
+ List<ServiceInstance> instances = this.getInstances(watcher.getServiceName());
for (ServiceInstancesChangedListener listener : listeners) {
- listener.onEvent(new ServiceInstancesChangedEvent(serviceName, instances));
+ listener.onEvent(new ServiceInstancesChangedEvent(watcher.getServiceName(), instances));
}
latch.countDown();
});
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 465d7dd..681019c 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -117,4 +117,8 @@
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
+
+ public String getServiceName() {
+ return serviceName;
+ }
}