Add wildcard subscription support for zookeeper registry (#2267)

diff --git a/common/url.go b/common/url.go
index 058f76a..2824ec6 100644
--- a/common/url.go
+++ b/common/url.go
@@ -413,6 +413,37 @@
 	return buf.String()
 }
 
+// ParseServiceKey gets interface, group and version from service key
+func ParseServiceKey(serviceKey string) (string, string, string) {
+	var (
+		group   string
+		version string
+	)
+	if serviceKey == "" {
+		return "", "", ""
+	}
+	// get group if it exists
+	sepIndex := strings.Index(serviceKey, constant.PathSeparator)
+	if sepIndex != -1 {
+		group = serviceKey[:sepIndex]
+		serviceKey = serviceKey[sepIndex+1:]
+	}
+	// get version if it exists
+	sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
+	if sepIndex != -1 {
+		version = serviceKey[sepIndex+1:]
+		serviceKey = serviceKey[:sepIndex]
+	}
+
+	return serviceKey, group, version
+}
+
+// IsAnyCondition judges if is any condition
+func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
+	return intf == constant.AnyValue && (group == constant.AnyValue ||
+		group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version())
+}
+
 // ColonSeparatedKey
 // The format is "{interface}:[version]:[group]"
 func (c *URL) ColonSeparatedKey() string {
diff --git a/common/url_test.go b/common/url_test.go
index 89953c3..2971e6c 100644
--- a/common/url_test.go
+++ b/common/url_test.go
@@ -421,3 +421,134 @@
 func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
 	return l.PrimitiveURL == r.PrimitiveURL
 }
+
+func TestParseServiceKey(t *testing.T) {
+	type args struct {
+		serviceKey string
+	}
+	tests := []struct {
+		name  string
+		args  args
+		want  string
+		want1 string
+		want2 string
+	}{
+		{
+			name: "test1",
+			args: args{
+				serviceKey: "group/interface:version",
+			},
+			want:  "interface",
+			want1: "group",
+			want2: "version",
+		},
+		{
+			name: "test2",
+			args: args{
+				serviceKey: "*/*:*",
+			},
+			want:  "*",
+			want1: "*",
+			want2: "*",
+		},
+		{
+			name: "test3",
+			args: args{
+				serviceKey: "group/org.apache.dubbo.mock.api.MockService",
+			},
+			want:  "org.apache.dubbo.mock.api.MockService",
+			want1: "group",
+			want2: "",
+		},
+		{
+			name: "test4",
+			args: args{
+				serviceKey: "org.apache.dubbo.mock.api.MockService",
+			},
+			want:  "org.apache.dubbo.mock.api.MockService",
+			want1: "",
+			want2: "",
+		},
+		{
+			name: "test5",
+			args: args{
+				serviceKey: "group/",
+			},
+			want:  "",
+			want1: "group",
+			want2: "",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
+			assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey)
+			assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey)
+			assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey)
+		})
+	}
+}
+
+func TestIsAnyCondition(t *testing.T) {
+	type args struct {
+		intf       string
+		group      string
+		version    string
+		serviceURL *URL
+	}
+	serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{
+		constant.GroupKey:   {"group"},
+		constant.VersionKey: {"version"},
+	}))
+	tests := []struct {
+		name string
+		args args
+		want bool
+	}{
+		{
+			name: "test1",
+			args: args{
+				intf:       constant.AnyValue,
+				group:      constant.AnyValue,
+				version:    constant.AnyValue,
+				serviceURL: serviceURL,
+			},
+			want: true,
+		},
+		{
+			name: "test2",
+			args: args{
+				intf:       constant.AnyValue,
+				group:      "group",
+				version:    "version",
+				serviceURL: serviceURL,
+			},
+			want: true,
+		},
+		{
+			name: "test3",
+			args: args{
+				intf:       "intf",
+				group:      constant.AnyValue,
+				version:    constant.AnyValue,
+				serviceURL: serviceURL,
+			},
+			want: false,
+		},
+		{
+			name: "test4",
+			args: args{
+				intf:       constant.AnyValue,
+				group:      "group1",
+				version:    constant.AnyValue,
+				serviceURL: serviceURL,
+			},
+			want: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
+		})
+	}
+}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 65871ad..c8f0f26 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -92,8 +92,10 @@
 	if l.closed {
 		return false
 	}
+	match := false
 	for serviceKey, listener := range l.subscribed {
-		if serviceURL.ServiceKey() == serviceKey {
+		intf, group, version := common.ParseServiceKey(serviceKey)
+		if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
 			listener.Process(
 				&config_center.ConfigChangeEvent{
 					Key:        event.Path,
@@ -101,10 +103,10 @@
 					ConfigType: event.Action,
 				},
 			)
-			return true
+			match = true
 		}
 	}
-	return false
+	return match
 }
 
 // Close all RegistryConfigurationListener in subscribed
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 498fc33..2e9abe3 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+	"net/url"
 	"path"
 	"strings"
 	"sync"
@@ -238,8 +239,89 @@
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
+		}
+		for _, c := range children {
+			// Build the child path
+			zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+			// Save the path to avoid listen repeatedly
+			l.pathMapLock.Lock()
+			if _, ok := l.pathMap[zkRootPath]; ok {
+				logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
+				l.pathMapLock.Unlock()
+				continue
+			} else {
+				l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+			}
+			l.pathMapLock.Unlock()
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
+			l.wg.Add(1)
+			// listen every interface
+			go l.listenDirEvent(conf, zkRootPath, listener, c)
+		}
+
+		ticker := time.NewTicker(ttl)
+		select {
+		case <-ticker.C:
+			ticker.Stop()
+		case zkEvent := <-childEventCh:
+			logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
+				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
+			ticker.Stop()
+		case <-l.exit:
+			logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) {
 	defer l.wg.Done()
+	if intf == constant.AnyValue {
+		l.listenAllDirEvents(conf, listener)
+		return
+	}
 	var (
 		failTimes int
 		ttl       time.Duration
@@ -279,7 +361,7 @@
 			// Only need to compare Path when subscribing to provider
 			if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
 				provider, _ := common.NewURL(c)
-				if provider.ServiceKey() != conf.ServiceKey() {
+				if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) {
 					continue
 				}
 			}
@@ -326,7 +408,6 @@
 		}
 		if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
 			return
-
 		}
 	}
 }
@@ -367,6 +448,7 @@
 		}
 	}
 }
+
 func timeSecondDuration(sec int) time.Duration {
 	return time.Duration(sec) * time.Second
 }
@@ -378,7 +460,11 @@
 	logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
 	l.wg.Add(1)
 	go func(zkPath string, listener remoting.DataListener) {
-		l.listenDirEvent(conf, zkPath, listener)
+		intf := ""
+		if conf != nil {
+			intf = conf.Interface()
+		}
+		l.listenDirEvent(conf, zkPath, listener, intf)
 		logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
 	}(zkPath, listener)
 }