Add wildcard subscription support for zookeeper registry (#2267)
diff --git a/common/url.go b/common/url.go
index 058f76a..2824ec6 100644
--- a/common/url.go
+++ b/common/url.go
@@ -413,6 +413,37 @@
return buf.String()
}
+// ParseServiceKey gets interface, group and version from service key
+func ParseServiceKey(serviceKey string) (string, string, string) {
+ var (
+ group string
+ version string
+ )
+ if serviceKey == "" {
+ return "", "", ""
+ }
+ // get group if it exists
+ sepIndex := strings.Index(serviceKey, constant.PathSeparator)
+ if sepIndex != -1 {
+ group = serviceKey[:sepIndex]
+ serviceKey = serviceKey[sepIndex+1:]
+ }
+ // get version if it exists
+ sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
+ if sepIndex != -1 {
+ version = serviceKey[sepIndex+1:]
+ serviceKey = serviceKey[:sepIndex]
+ }
+
+ return serviceKey, group, version
+}
+
+// IsAnyCondition judges if is any condition
+func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
+ return intf == constant.AnyValue && (group == constant.AnyValue ||
+ group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version())
+}
+
// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
diff --git a/common/url_test.go b/common/url_test.go
index 89953c3..2971e6c 100644
--- a/common/url_test.go
+++ b/common/url_test.go
@@ -421,3 +421,134 @@
func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
return l.PrimitiveURL == r.PrimitiveURL
}
+
+func TestParseServiceKey(t *testing.T) {
+ type args struct {
+ serviceKey string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ want1 string
+ want2 string
+ }{
+ {
+ name: "test1",
+ args: args{
+ serviceKey: "group/interface:version",
+ },
+ want: "interface",
+ want1: "group",
+ want2: "version",
+ },
+ {
+ name: "test2",
+ args: args{
+ serviceKey: "*/*:*",
+ },
+ want: "*",
+ want1: "*",
+ want2: "*",
+ },
+ {
+ name: "test3",
+ args: args{
+ serviceKey: "group/org.apache.dubbo.mock.api.MockService",
+ },
+ want: "org.apache.dubbo.mock.api.MockService",
+ want1: "group",
+ want2: "",
+ },
+ {
+ name: "test4",
+ args: args{
+ serviceKey: "org.apache.dubbo.mock.api.MockService",
+ },
+ want: "org.apache.dubbo.mock.api.MockService",
+ want1: "",
+ want2: "",
+ },
+ {
+ name: "test5",
+ args: args{
+ serviceKey: "group/",
+ },
+ want: "",
+ want1: "group",
+ want2: "",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
+ assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey)
+ assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey)
+ assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey)
+ })
+ }
+}
+
+func TestIsAnyCondition(t *testing.T) {
+ type args struct {
+ intf string
+ group string
+ version string
+ serviceURL *URL
+ }
+ serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{
+ constant.GroupKey: {"group"},
+ constant.VersionKey: {"version"},
+ }))
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "test1",
+ args: args{
+ intf: constant.AnyValue,
+ group: constant.AnyValue,
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: true,
+ },
+ {
+ name: "test2",
+ args: args{
+ intf: constant.AnyValue,
+ group: "group",
+ version: "version",
+ serviceURL: serviceURL,
+ },
+ want: true,
+ },
+ {
+ name: "test3",
+ args: args{
+ intf: "intf",
+ group: constant.AnyValue,
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: false,
+ },
+ {
+ name: "test4",
+ args: args{
+ intf: constant.AnyValue,
+ group: "group1",
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
+ })
+ }
+}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 65871ad..c8f0f26 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -92,8 +92,10 @@
if l.closed {
return false
}
+ match := false
for serviceKey, listener := range l.subscribed {
- if serviceURL.ServiceKey() == serviceKey {
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
@@ -101,10 +103,10 @@
ConfigType: event.Action,
},
)
- return true
+ match = true
}
}
- return false
+ return match
}
// Close all RegistryConfigurationListener in subscribed
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 498fc33..2e9abe3 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
package zookeeper
import (
+ "net/url"
"path"
"strings"
"sync"
@@ -238,8 +239,89 @@
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the child path
+ zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ if _, ok := l.pathMap[zkRootPath]; ok {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
+ l.pathMapLock.Unlock()
+ continue
+ } else {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
+ l.wg.Add(1)
+ // listen every interface
+ go l.listenDirEvent(conf, zkRootPath, listener, c)
+ }
+
+ ticker := time.NewTicker(ttl)
+ select {
+ case <-ticker.C:
+ ticker.Stop()
+ case zkEvent := <-childEventCh:
+ logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
+ zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
+ ticker.Stop()
+ case <-l.exit:
+ logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) {
defer l.wg.Done()
+ if intf == constant.AnyValue {
+ l.listenAllDirEvents(conf, listener)
+ return
+ }
var (
failTimes int
ttl time.Duration
@@ -279,7 +361,7 @@
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
- if provider.ServiceKey() != conf.ServiceKey() {
+ if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) {
continue
}
}
@@ -326,7 +408,6 @@
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return
-
}
}
}
@@ -367,6 +448,7 @@
}
}
}
+
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
@@ -378,7 +460,11 @@
logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
- l.listenDirEvent(conf, zkPath, listener)
+ intf := ""
+ if conf != nil {
+ intf = conf.Interface()
+ }
+ l.listenDirEvent(conf, zkPath, listener, intf)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}