Merge branch 'main' into feature-triple
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index 1c25fc5..e15150c 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -2,7 +2,9 @@
on:
push:
- branches: "*"
+ branches:
+ - main
+ - 'release-*'
pull_request:
branches: "*"
@@ -75,12 +77,10 @@
run: |
if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then
chmod +x integrate_test.sh \
- && [[ -n "${{github.event.pull_request.head.repo.full_name}}" ]] \
- && [[ -n "${{github.event.pull_request.head.sha}}" ]] \
- && [[ -n "${{github.base_ref}}" ]] \
&& ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}} ${{github.base_ref}}
elif [ "$GITHUB_EVENT_NAME" == "push" ]; then
- chmod +x integrate_test.sh && ./integrate_test.sh $GITHUB_REPOSITORY $GITHUB_SHA $GITHUB_BASE_REF
+ chmod +x integrate_test.sh \
+ && ./integrate_test.sh $GITHUB_REPOSITORY $GITHUB_SHA $GITHUB_REF_NAME
else
echo "$GITHUB_EVENT_NAME is an unsupported event type."
exit 1
diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 7eaf456..4eb47e9 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -38,8 +38,6 @@
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-var _ protocol.Invoker = (*adaptiveServiceClusterInvoker)(nil)
-
type adaptiveServiceClusterInvoker struct {
base.BaseClusterInvoker
}
diff --git a/cluster/directory/base/directory.go b/cluster/directory/base/directory.go
index 93dfcb6..38812b9 100644
--- a/cluster/directory/base/directory.go
+++ b/cluster/directory/base/directory.go
@@ -42,8 +42,8 @@
}
// NewDirectory Create BaseDirectory with URL
-func NewDirectory(url *common.URL) Directory {
- return Directory{
+func NewDirectory(url *common.URL) *Directory {
+ return &Directory{
url: url,
destroyed: atomic.NewBool(false),
routerChain: &chain.RouterChain{},
@@ -91,8 +91,8 @@
return false
}
-// Destroy Destroy
-func (dir *Directory) Destroy(doDestroy func()) {
+// DoDestroy stop directory
+func (dir *Directory) DoDestroy(doDestroy func()) {
if dir.destroyed.CAS(false, true) {
dir.mutex.Lock()
doDestroy()
@@ -100,7 +100,7 @@
}
}
-// IsAvailable Once directory init finish, it will change to true
-func (dir *Directory) IsAvailable() bool {
- return !dir.destroyed.Load()
+// IsDestroyed Once directory init finish, it will change to true
+func (dir *Directory) IsDestroyed() bool {
+ return dir.destroyed.Load()
}
diff --git a/cluster/directory/directory.go b/cluster/directory/directory.go
index 257eb4a..b280006 100644
--- a/cluster/directory/directory.go
+++ b/cluster/directory/directory.go
@@ -22,8 +22,7 @@
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-// Directory
-// Extension - Directory
+// Directory implementations include RegistryDirectory, ServiceDiscoveryRegistryDirectory, StaticDirectory
type Directory interface {
common.Node
@@ -32,4 +31,7 @@
// implementation for the sake of performance consideration. This requires the caller of List() shouldn't modify
// the return result directly.
List(invocation protocol.Invocation) []protocol.Invoker
+
+ // Subscribe listen to registry instances
+ Subscribe(url *common.URL) error
}
diff --git a/cluster/directory/static/directory.go b/cluster/directory/static/directory.go
index 0e9ecc3..d8ed888 100644
--- a/cluster/directory/static/directory.go
+++ b/cluster/directory/static/directory.go
@@ -29,7 +29,7 @@
)
type directory struct {
- base.Directory
+ *base.Directory
invokers []protocol.Invoker
}
@@ -51,6 +51,10 @@
// for-loop invokers ,if all invokers is available ,then it means directory is available
func (dir *directory) IsAvailable() bool {
+ if dir.Directory.IsDestroyed() {
+ return false
+ }
+
if len(dir.invokers) == 0 {
return false
}
@@ -78,7 +82,7 @@
// Destroy Destroy
func (dir *directory) Destroy() {
- dir.Directory.Destroy(func() {
+ dir.Directory.DoDestroy(func() {
for _, ivk := range dir.invokers {
ivk.Destroy()
}
@@ -99,3 +103,7 @@
dir.SetRouterChain(routerChain)
return nil
}
+
+func (dir *directory) Subscribe(url *common.URL) error {
+ panic("Static directory does not support subscribing to registry.")
+}
diff --git a/cluster/router/condition/dynamic_router.go b/cluster/router/condition/dynamic_router.go
index 6d89771..c02d0a0 100644
--- a/cluster/router/condition/dynamic_router.go
+++ b/cluster/router/condition/dynamic_router.go
@@ -127,8 +127,7 @@
logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return
}
- key := strings.Join([]string{strings.Join([]string{url.Service(), url.GetParam(constant.VersionKey, ""), url.GetParam(constant.GroupKey, "")}, ":"),
- constant.ConditionRouterRuleSuffix}, "")
+ key := strings.Join([]string{url.ColonSeparatedKey(), constant.ConditionRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(key, s)
value, err := dynamicConfiguration.GetRule(key)
if err != nil {
diff --git a/cluster/router/tag/match.go b/cluster/router/tag/match.go
index 1463360..0b4493d 100644
--- a/cluster/router/tag/match.go
+++ b/cluster/router/tag/match.go
@@ -99,22 +99,38 @@
var (
addresses []string
result []protocol.Invoker
+ match []*common.ParamMatch
)
for _, tagCfg := range cfg.Tags {
if tagCfg.Name == tag {
addresses = tagCfg.Addresses
+ match = tagCfg.Match
}
}
- if len(addresses) == 0 {
- // filter tag does not match
- result = filterInvokers(invokers, tag, func(invoker protocol.Invoker, tag interface{}) bool {
- return invoker.GetURL().GetParam(constant.Tagkey, "") != tag
+
+ // only one of 'match' and 'addresses' will take effect if both are specified.
+ if len(match) != 0 {
+ result = filterInvokers(invokers, match, func(invoker protocol.Invoker, match interface{}) bool {
+ matches := match.([]*common.ParamMatch)
+ for _, m := range matches {
+ if !m.IsMatch(invoker.GetURL()) {
+ return true
+ }
+ }
+ return false
})
- logger.Debugf("[tag router] filter dynamic tag, tag=%s, invokers=%+v", tag, result)
} else {
- // filter address does not match
- result = filterInvokers(invokers, addresses, getAddressPredicate(false))
- logger.Debugf("[tag router] filter dynamic tag address, invokers=%+v", result)
+ if len(addresses) == 0 {
+ // filter tag does not match
+ result = filterInvokers(invokers, tag, func(invoker protocol.Invoker, tag interface{}) bool {
+ return invoker.GetURL().GetParam(constant.Tagkey, "") != tag
+ })
+ logger.Debugf("[tag router] filter dynamic tag, tag=%s, invokers=%+v", tag, result)
+ } else {
+ // filter address does not match
+ result = filterInvokers(invokers, addresses, getAddressPredicate(false))
+ logger.Debugf("[tag router] filter dynamic tag address, invokers=%+v", result)
+ }
}
// returns the result directly
if *cfg.Force || requestIsForce(url, invocation) {
@@ -135,6 +151,7 @@
return result
}
+// filterInvokers remove invokers that match with predicate from the original input.
func filterInvokers(invokers []protocol.Invoker, param interface{}, predicate predicate) []protocol.Invoker {
result := make([]protocol.Invoker, len(invokers))
copy(result, invokers)
diff --git a/common/constant/cluster.go b/common/constant/cluster.go
index 4467b3d..a8b321b 100644
--- a/common/constant/cluster.go
+++ b/common/constant/cluster.go
@@ -32,3 +32,8 @@
const (
NonImportErrorMsgFormat = "Cluster for %s is not existing, make sure you have import the package."
)
+
+const (
+ MatchCondition = "MATCH_CONDITION"
+ APIVersion = "v3.0"
+)
diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go
index ae2fef3..32309de 100644
--- a/common/extension/registry_directory.go
+++ b/common/extension/registry_directory.go
@@ -18,6 +18,10 @@
package extension
import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -25,17 +29,36 @@
type registryDirectory func(url *common.URL, registry registry.Registry) (directory.Directory, error)
-var defaultRegistry registryDirectory
+var directories = make(map[string]registryDirectory)
+var defaultDirectory registryDirectory
// SetDefaultRegistryDirectory sets the default registryDirectory
func SetDefaultRegistryDirectory(v registryDirectory) {
- defaultRegistry = v
+ defaultDirectory = v
+}
+
+// SetDirectory sets the default registryDirectory
+func SetDirectory(key string, v registryDirectory) {
+ directories[key] = v
}
// GetDefaultRegistryDirectory finds the registryDirectory with url and registry
func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (directory.Directory, error) {
- if defaultRegistry == nil {
+ if defaultDirectory == nil {
panic("registry directory is not existing, make sure you have import the package.")
}
- return defaultRegistry(config, registry)
+ return defaultDirectory(config, registry)
+}
+
+// GetDirectoryInstance finds the registryDirectory with url and registry
+func GetDirectoryInstance(config *common.URL, registry registry.Registry) (directory.Directory, error) {
+ key := config.Protocol
+ if key == "" {
+ return GetDefaultRegistryDirectory(config, registry)
+ }
+ if directories[key] == nil {
+ logger.Warn("registry directory " + key + " does not exist, make sure you have import the package, will use the default directory type.")
+ return GetDefaultRegistryDirectory(config, registry)
+ }
+ return directories[key](config, registry)
}
diff --git a/common/host_util.go b/common/host_util.go
index eac0ac1..a1bbc4f 100644
--- a/common/host_util.go
+++ b/common/host_util.go
@@ -20,6 +20,7 @@
import (
"os"
"strconv"
+ "strings"
)
import (
@@ -80,3 +81,28 @@
portInt, err := strconv.Atoi(port)
return err == nil && portInt > 0 && portInt < 65536
}
+
+func IsMatchGlobPattern(pattern, value string) bool {
+ if constant.AnyValue == pattern {
+ return true
+ }
+ if pattern == "" && value == "" {
+ return true
+ }
+ if pattern == "" || value == "" {
+ return false
+ }
+
+ i := strings.Index(pattern, constant.AnyValue)
+ if i == -1 { // doesn't find "*"
+ return value == pattern
+ } else if i == len(pattern)-1 { // "*" is at the end
+ return strings.HasPrefix(value, pattern[0:i])
+ } else if i == 0 { // "*" is at the beginning
+ return strings.HasSuffix(value, pattern[i+1:])
+ } else { // "*" is in the middle
+ prefix := pattern[0:i]
+ suffix := pattern[i+1:]
+ return strings.HasPrefix(value, prefix) && strings.HasSuffix(value, suffix)
+ }
+}
diff --git a/common/match.go b/common/match.go
new file mode 100644
index 0000000..2d9897d
--- /dev/null
+++ b/common/match.go
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "fmt"
+ "net"
+ "regexp"
+ "strings"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+)
+
+type ParamMatch struct {
+ Key string `yaml:"key" json:"key,omitempty" property:"key"`
+ Value StringMatch `yaml:"value" json:"value,omitempty" property:"value"`
+}
+
+func (p *ParamMatch) IsMatch(url *URL) bool {
+ return p.Value.IsMatch(url.GetParam(p.Key, ""))
+}
+
+type StringMatch struct {
+ Exact string `yaml:"exact" json:"exact,omitempty" property:"exact"`
+ Prefix string `yaml:"prefix" json:"prefix,omitempty" property:"prefix"`
+ Regex string `yaml:"regex" json:"regex,omitempty" property:"regex"`
+ Noempty string `yaml:"noempty" json:"noempty,omitempty" property:"noempty"`
+ Empty string `yaml:"empty" json:"empty,omitempty" property:"empty"`
+ Wildcard string `yaml:"wildcard" json:"wildcard,omitempty" property:"wildcard"`
+}
+
+func (p *StringMatch) IsMatch(value string) bool {
+ if p.Exact != "" {
+ return p.Exact == value
+ } else if p.Prefix != "" {
+ return strings.HasPrefix(value, p.Prefix)
+ } else if p.Regex != "" {
+ match, _ := regexp.MatchString(p.Regex, value)
+ return match
+ } else if p.Wildcard != "" {
+ return value == p.Wildcard || constant.AnyValue == p.Wildcard
+ } else if p.Empty != "" {
+ return value == ""
+ } else if p.Noempty != "" {
+ return value != ""
+ }
+ return false
+}
+
+type AddressMatch struct {
+ Wildcard string `yaml:"wildcard" json:"wildcard,omitempty" property:"wildcard"`
+ Cird string `yaml:"cird" json:"cird,omitempty" property:"cird"`
+ Exact string `yaml:"exact" json:"exact,omitempty" property:"exact"`
+}
+
+func (p *AddressMatch) IsMatch(value string) bool {
+ if p.Cird != "" && value != "" {
+ _, ipnet, err := net.ParseCIDR(p.Cird)
+ if err != nil {
+ fmt.Println("Error", p.Cird, err)
+ return false
+ }
+ return ipnet.Contains(net.ParseIP(value))
+ }
+ if p.Wildcard != "" && value != "" {
+ if constant.AnyValue == value || constant.AnyHostValue == value {
+ return true
+ }
+ return IsMatchGlobPattern(p.Wildcard, value)
+ }
+ if p.Exact != "" && value != "" {
+ return p.Exact == value
+ }
+ return false
+}
+
+type ListStringMatch struct {
+ Oneof []StringMatch `yaml:"oneof" json:"oneof,omitempty" property:"oneof"`
+}
+
+func (p *ListStringMatch) IsMatch(value string) bool {
+ for _, match := range p.Oneof {
+ if match.IsMatch(value) {
+ return true
+ }
+ }
+ return false
+}
diff --git a/common/url.go b/common/url.go
index cd551c2..7ec0a10 100644
--- a/common/url.go
+++ b/common/url.go
@@ -117,7 +117,19 @@
// Attributes should not be transported
Attributes map[string]interface{} `hessian:"-"`
// special for registry
- SubURL *URL
+ SubURL *URL
+ attributes sync.Map
+}
+
+func (c *URL) AddAttribute(key string, value interface{}) {
+ if value != nil {
+ c.attributes.Store(key, value)
+ }
+}
+
+func (c *URL) GetAttribute(key string) interface{} {
+ v, _ := c.attributes.Load(key)
+ return v
}
// JavaClassName POJO for URL
diff --git a/config/instance/metadata_report_test.go b/config/instance/metadata_report_test.go
index 9edf493..d1156b2 100644
--- a/config/instance/metadata_report_test.go
+++ b/config/instance/metadata_report_test.go
@@ -54,6 +54,11 @@
type mockMetadataReport struct{}
+func (m mockMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
func (m mockMetadataReport) RegisterServiceAppMapping(string, string, string) error {
panic("implement me")
}
diff --git a/config/metric_config.go b/config/metric_config.go
index af41eb8..1452000 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -39,10 +39,9 @@
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
- EnableMetadata *bool `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
- EnableRegistry *bool `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
- EnableConfigCenter *bool `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
- EnableRpc *bool `default:"true" yaml:"enable-rpc" json:"enable-rpc,omitempty" property:"enable-rpc"`
+ EnableMetadata *bool `default:"false" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
+ EnableRegistry *bool `default:"false" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
+ EnableConfigCenter *bool `default:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
rootConfig *RootConfig
@@ -60,7 +59,7 @@
}
type Exporter struct {
- Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"`
+ Enabled *bool `default:"true" yaml:"enabled" json:"enabled,omitempty" property:"enabled"`
}
type PushgatewayConfig struct {
@@ -93,7 +92,9 @@
return err
}
mc.rootConfig = rc
- metrics.Init(mc.toURL())
+ if *mc.Enable {
+ metrics.Init(mc.toURL())
+ }
return nil
}
@@ -120,11 +121,6 @@
return mcb
}
-func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) *MetricConfigBuilder {
- mcb.metricConfig.EnableRpc = &enabled
- return mcb
-}
-
func (mcb *MetricConfigBuilder) Build() *MetricConfig {
return mcb.metricConfig
}
@@ -137,15 +133,14 @@
// prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
func (mc *MetricConfig) toURL() *common.URL {
url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
- url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) // for compatibility
url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+ url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.Enable))
url.SetParam(constant.MetadataEnabledKey, strconv.FormatBool(*mc.EnableMetadata))
url.SetParam(constant.RegistryEnabledKey, strconv.FormatBool(*mc.EnableRegistry))
url.SetParam(constant.ConfigCenterEnabledKey, strconv.FormatBool(*mc.EnableConfigCenter))
- url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
if mc.Aggregation != nil {
url.SetParam(constant.AggregationEnabledKey, strconv.FormatBool(*mc.Aggregation.Enabled))
url.SetParam(constant.AggregationBucketNumKey, strconv.Itoa(mc.Aggregation.BucketNum))
@@ -154,7 +149,7 @@
if mc.Prometheus != nil {
if mc.Prometheus.Exporter != nil {
exporter := mc.Prometheus.Exporter
- url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*exporter.Enabled || *mc.Enable)) // for compatibility
+ url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*exporter.Enabled))
}
if mc.Prometheus.Pushgateway != nil {
pushGateWay := mc.Prometheus.Pushgateway
diff --git a/config/metric_config_test.go b/config/metric_config_test.go
index 31a0ac6..1dfd3af 100644
--- a/config/metric_config_test.go
+++ b/config/metric_config_test.go
@@ -30,13 +30,11 @@
SetConfigCenterEnabled(false).
SetMetadataEnabled(false).
SetRegistryEnabled(false).
- SetRpcEnabled(false).
Build()
enable := false
assert.Equal(t, &MetricConfig{
EnableConfigCenter: &enable,
EnableMetadata: &enable,
EnableRegistry: &enable,
- EnableRpc: &enable,
}, config)
}
diff --git a/config/router_config.go b/config/router_config.go
index 616e994..50d65f3 100644
--- a/config/router_config.go
+++ b/config/router_config.go
@@ -23,6 +23,7 @@
import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/chain"
+ "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
)
@@ -41,8 +42,9 @@
}
type Tag struct {
- Name string `yaml:"name" json:"name,omitempty" property:"name"`
- Addresses []string `yaml:"addresses" json:"addresses,omitempty" property:"addresses"`
+ Name string `yaml:"name" json:"name,omitempty" property:"name"`
+ Match []*common.ParamMatch `yaml:"match" json:"match,omitempty" property:"match"`
+ Addresses []string `yaml:"addresses" json:"addresses,omitempty" property:"addresses"`
}
// Prefix dubbo.router
diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go
index b393225..fb1f0d6 100644
--- a/config_center/configurator/override.go
+++ b/config_center/configurator/override.go
@@ -30,6 +30,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/config_center/parser"
)
func init() {
@@ -57,13 +58,19 @@
// branch for version 2.7.x
apiVersion := c.configuratorUrl.GetParam(constant.ConfigVersionKey, "")
if len(apiVersion) != 0 {
+ var host string
currentSide := url.GetParam(constant.SideKey, "")
configuratorSide := c.configuratorUrl.GetParam(constant.SideKey, "")
- if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide && c.configuratorUrl.Port == "0" {
- localIP := common.GetLocalIp()
- c.configureIfMatch(localIP, url)
- } else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide && c.configuratorUrl.Port == url.Port {
- c.configureIfMatch(url.Ip, url)
+ if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide {
+ host = common.GetLocalIp()
+ } else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide {
+ host = url.Ip
+ }
+
+ if strings.HasPrefix(apiVersion, constant.APIVersion) {
+ c.configureIfMatchV3(host, url)
+ } else {
+ c.configureIfMatch(host, url)
}
} else {
// branch for version 2.6.x and less
@@ -71,20 +78,43 @@
}
}
+// configureIfMatch
+func (c *overrideConfigurator) configureIfMatchV3(host string, url *common.URL) {
+ conditionKeys := getConditionKeys()
+ matcher := c.configuratorUrl.GetAttribute(constant.MatchCondition)
+ if matcher != nil {
+ conditionMatcher := matcher.(*parser.ConditionMatch)
+ if conditionMatcher.IsMatch(host, url) {
+ configUrl := c.configuratorUrl.CloneExceptParams(conditionKeys)
+ url.SetParams(configUrl.GetParams())
+ }
+ }
+}
+
+func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
+ // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
+ if c.configuratorUrl.Port != "0" {
+ if url.Port == c.configuratorUrl.Port {
+ c.configureIfMatch(url.Ip, url)
+ }
+ } else {
+ // override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
+ // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
+ // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
+ if url.GetParam(constant.SideKey, "") == common.DubboRole[common.CONSUMER] {
+ localIP := common.GetLocalIp()
+ c.configureIfMatch(localIP, url)
+ } else {
+ c.configureIfMatch(constant.AnyHostValue, url)
+ }
+ }
+}
+
func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {
configApp := c.configuratorUrl.GetParam(constant.ApplicationKey, c.configuratorUrl.Username)
currentApp := url.GetParam(constant.ApplicationKey, url.Username)
if len(configApp) == 0 || constant.AnyValue == configApp || configApp == currentApp {
- conditionKeys := gxset.NewSet()
- conditionKeys.Add(constant.CategoryKey)
- conditionKeys.Add(constant.CheckKey)
- conditionKeys.Add(constant.EnabledKey)
- conditionKeys.Add(constant.GroupKey)
- conditionKeys.Add(constant.VersionKey)
- conditionKeys.Add(constant.ApplicationKey)
- conditionKeys.Add(constant.SideKey)
- conditionKeys.Add(constant.ConfigVersionKey)
- conditionKeys.Add(constant.CompatibleConfigKey)
+ conditionKeys := getConditionKeys()
returnUrl := false
c.configuratorUrl.RangeParams(func(k, _ string) bool {
value := c.configuratorUrl.GetParam(k, "")
@@ -115,21 +145,16 @@
}
}
-func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
- // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
- if c.configuratorUrl.Port != "0" {
- if url.Port == c.configuratorUrl.Port {
- c.configureIfMatch(url.Ip, url)
- }
- } else {
- // override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
- // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
- // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
- if url.GetParam(constant.SideKey, "") == common.DubboRole[common.CONSUMER] {
- localIP := common.GetLocalIp()
- c.configureIfMatch(localIP, url)
- } else {
- c.configureIfMatch(constant.AnyHostValue, url)
- }
- }
+func getConditionKeys() *gxset.HashSet {
+ conditionKeys := gxset.NewSet()
+ conditionKeys.Add(constant.CategoryKey)
+ conditionKeys.Add(constant.CheckKey)
+ conditionKeys.Add(constant.EnabledKey)
+ conditionKeys.Add(constant.GroupKey)
+ conditionKeys.Add(constant.VersionKey)
+ conditionKeys.Add(constant.ApplicationKey)
+ conditionKeys.Add(constant.SideKey)
+ conditionKeys.Add(constant.ConfigVersionKey)
+ conditionKeys.Add(constant.CompatibleConfigKey)
+ return conditionKeys
}
diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go
index 0abf808..76ffeea 100644
--- a/config_center/mock_dynamic_config.go
+++ b/config_center/mock_dynamic_config.go
@@ -36,7 +36,8 @@
// MockDynamicConfigurationFactory defines content
type MockDynamicConfigurationFactory struct {
- Content string
+ Content string
+ ConfiguratorContent string
}
const (
@@ -82,6 +83,8 @@
})
if len(f.Content) != 0 {
dynamicConfiguration.content = f.Content
+ } else if len(f.ConfiguratorContent) != 0 {
+ dynamicConfiguration.content = f.ConfiguratorContent
}
return dynamicConfiguration, err
}
@@ -168,7 +171,7 @@
},
}
value, _ := yaml.Marshal(config)
- key := "group*" + mockServiceName + ":1.0.0" + constant.ConfiguratorSuffix
+ key := mockServiceName + ":1.0.0:group" + constant.ConfiguratorSuffix
c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd})
}
diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go
index 1f4b960..a262cc6 100644
--- a/config_center/parser/configuration_parser.go
+++ b/config_center/parser/configuration_parser.go
@@ -70,6 +70,38 @@
Applications []string `yaml:"applications"`
Parameters map[string]string `yaml:"parameters"`
Side string `yaml:"side"`
+ Match *ConditionMatch `yaml:"match"`
+}
+
+type ConditionMatch struct {
+ Address *common.AddressMatch `yaml:"address"`
+ ProviderAddress *common.AddressMatch `yaml:"providerAddress"`
+ Service *common.ListStringMatch `yaml:"service"`
+ App *common.ListStringMatch `yaml:"app"`
+ Param []*common.ParamMatch `yaml:"param"`
+}
+
+func (c *ConditionMatch) IsMatch(host string, url *common.URL) bool {
+ if !c.Address.IsMatch(host) {
+ return false
+ }
+ if !c.ProviderAddress.IsMatch(url.Location) {
+ return false
+ }
+ if !c.Service.IsMatch(url.ServiceKey()) {
+ return false
+ }
+ if !c.App.IsMatch(url.GetParam(constant.ApplicationKey, "")) {
+ return false
+ }
+ if c.Param != nil {
+ for _, p := range c.Param {
+ if !p.IsMatch(url) {
+ return false
+ }
+ }
+ }
+ return true
}
// Parse load content
@@ -145,6 +177,7 @@
if err != nil {
return nil, perrors.WithStack(err)
}
+ url.AddAttribute(constant.MatchCondition, item.Match)
urls = append(urls, url)
}
} else {
@@ -152,6 +185,7 @@
if err != nil {
return nil, perrors.WithStack(err)
}
+ url.AddAttribute(constant.MatchCondition, item.Match)
urls = append(urls, url)
}
}
@@ -193,6 +227,7 @@
if err != nil {
return nil, perrors.WithStack(err)
}
+ url.AddAttribute(constant.MatchCondition, item.Match)
urls = append(urls, url)
}
}
diff --git a/go.mod b/go.mod
index c861903..1c6c705 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
+ github.com/nacos-group/nacos-sdk-go v1.0.9 // indirect
github.com/nacos-group/nacos-sdk-go/v2 v2.2.2
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/opentracing/opentracing-go v1.2.0
diff --git a/go.sum b/go.sum
index 0bf5bb8..965544e 100644
--- a/go.sum
+++ b/go.sum
@@ -1004,8 +1004,9 @@
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
-github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM=
github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
+github.com/nacos-group/nacos-sdk-go v1.0.9 h1:sMvrp6tZj4LdhuHRsS4GCqASB81k3pjmT2ykDQQpwt0=
+github.com/nacos-group/nacos-sdk-go v1.0.9/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g=
github.com/nacos-group/nacos-sdk-go/v2 v2.2.2 h1:FI+7vr1fvCA4jbgx36KezmP3zlU/WoP/7wAloaSd1Ew=
github.com/nacos-group/nacos-sdk-go/v2 v2.2.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g=
diff --git a/imports/imports.go b/imports/imports.go
index 54c6ca5..48c58cb 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -78,6 +78,7 @@
_ "dubbo.apache.org/dubbo-go/v3/protocol/triple"
_ "dubbo.apache.org/dubbo-go/v3/protocol/triple/health"
_ "dubbo.apache.org/dubbo-go/v3/proxy/proxy_factory"
+ _ "dubbo.apache.org/dubbo-go/v3/registry/directory"
_ "dubbo.apache.org/dubbo-go/v3/registry/etcdv3"
_ "dubbo.apache.org/dubbo-go/v3/registry/nacos"
_ "dubbo.apache.org/dubbo-go/v3/registry/polaris"
diff --git a/integrate_test.sh b/integrate_test.sh
index f8ca769..374202c 100644
--- a/integrate_test.sh
+++ b/integrate_test.sh
@@ -17,29 +17,22 @@
#!/bin/bash
set -e
-set -x
-echo 'start integrate-test'
+echo "start integrate-test: repo = $1, SHA = $2, branch = $3"
# set root workspace
ROOT_DIR=$(pwd)
echo "integrate-test root work-space -> ${ROOT_DIR}"
-# show all github-env
-echo "github current commit id -> $2"
-echo "github pull request branch -> ${GITHUB_REF}"
-echo "github pull request slug -> ${GITHUB_REPOSITORY}"
-echo "github pull request repo slug -> ${GITHUB_REPOSITORY}"
-echo "github pull request actor -> ${GITHUB_ACTOR}"
-echo "github pull request repo param -> $1"
-echo "github pull request base branch -> $3"
-echo "github pull request head branch -> ${GITHUB_HEAD_REF}"
-
echo "use dubbo-go-samples $3 branch for integration testing"
-git clone -b master https://github.com/apache/dubbo-go-samples.git samples && cd samples
+git clone -b $3 https://github.com/apache/dubbo-go-samples.git samples && cd samples
# update dubbo-go to current commit id
-go mod edit -replace=dubbo.apache.org/dubbo-go/v3=github.com/"$1"/v3@"$2"
+if [ "$1" == "apache/dubbo-go" ]; then
+ go mod edit -replace=dubbo.apache.org/dubbo-go/v3=dubbo.apache.org/dubbo-go/v3@"$2"
+else
+ go mod edit -replace=dubbo.apache.org/dubbo-go/v3=github.com/"$1"/v3@"$2"
+fi
go mod tidy
diff --git a/metadata/mapping/metadata/service_name_mapping.go b/metadata/mapping/metadata/service_name_mapping.go
index 3ba04ac..2580733 100644
--- a/metadata/mapping/metadata/service_name_mapping.go
+++ b/metadata/mapping/metadata/service_name_mapping.go
@@ -40,7 +40,7 @@
)
const (
- defaultGroup = "mapping"
+ DefaultGroup = "mapping"
slash = "/"
)
@@ -68,7 +68,7 @@
if metadataReport == nil {
logger.Info("get metadata report instance is nil, metadata service will be enabled!")
} else {
- err := metadataReport.RegisterServiceAppMapping(serviceInterface, defaultGroup, appName)
+ err := metadataReport.RegisterServiceAppMapping(serviceInterface, DefaultGroup, appName)
if err != nil {
return perrors.WithStack(err)
}
@@ -80,20 +80,20 @@
func (d *MetadataServiceNameMapping) Get(url *common.URL, listener registry.MappingListener) (*gxset.HashSet, error) {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
metadataReport := instance.GetMetadataReportInstance()
- return metadataReport.GetServiceAppMapping(serviceInterface, defaultGroup, listener)
+ return metadataReport.GetServiceAppMapping(serviceInterface, DefaultGroup, listener)
}
func (d *MetadataServiceNameMapping) Remove(url *common.URL) error {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
metadataReport := instance.GetMetadataReportInstance()
- return metadataReport.RemoveServiceAppMappingListener(serviceInterface, defaultGroup)
+ return metadataReport.RemoveServiceAppMappingListener(serviceInterface, DefaultGroup)
}
-// buildMappingKey will return mapping key, it looks like defaultGroup/serviceInterface
+// buildMappingKey will return mapping key, it looks like DefaultGroup/serviceInterface
func (d *MetadataServiceNameMapping) buildMappingKey(serviceInterface string) string {
// the issue : https://github.com/apache/dubbo/issues/4671
// so other params are ignored and remove, including group string, version string, protocol string
- return defaultGroup + slash + serviceInterface
+ return DefaultGroup + slash + serviceInterface
}
var (
diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go
index d57b35f..3ca22b1 100644
--- a/metadata/report/delegate/delegate_report.go
+++ b/metadata/report/delegate/delegate_report.go
@@ -25,6 +25,7 @@
)
import (
+ gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/log/logger"
"github.com/go-co-op/gocron"
@@ -311,3 +312,8 @@
}
return false
}
+
+func (mr *MetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ //TODO implement me
+ panic("implement me")
+}
diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go
index 4483a2a..487b039 100644
--- a/metadata/report/etcd/report.go
+++ b/metadata/report/etcd/report.go
@@ -54,6 +54,11 @@
root string
}
+func (e *etcdMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
// GetAppMetadata get metadata info from etcd
func (e *etcdMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
key := e.getNodeKey(metadataIdentifier)
diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go
index 85ac519..fd8182c 100644
--- a/metadata/report/nacos/report.go
+++ b/metadata/report/nacos/report.go
@@ -38,12 +38,21 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
+ "dubbo.apache.org/dubbo-go/v3/metadata/mapping/metadata"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
+const (
+ // the number is a little big tricky
+ // it will be used in query which looks up all keys with the target group
+ // now, one key represents one application
+ // so only a group has more than 9999 applications will failed
+ maxKeysNum = 9999
+)
+
func init() {
mf := &nacosMetadataReportFactory{}
extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory {
@@ -55,13 +64,14 @@
// of MetadataReport based on nacos.
type nacosMetadataReport struct {
client *nacosClient.NacosConfigClient
+ group string
}
// GetAppMetadata get metadata info from nacos
func (n *nacosMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
data, err := n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
})
if err != nil {
return nil, err
@@ -84,7 +94,7 @@
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
Content: string(data),
})
}
@@ -93,7 +103,7 @@
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
- Group: providerIdentifier.Group,
+ Group: n.group,
Content: serviceDefinitions,
})
}
@@ -102,7 +112,7 @@
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
- Group: consumerMetadataIdentifier.Group,
+ Group: n.group,
Content: serviceParameterString,
})
}
@@ -111,7 +121,7 @@
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url *common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
Content: url.String(),
})
}
@@ -120,7 +130,7 @@
func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return n.deleteMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
})
}
@@ -128,7 +138,7 @@
func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
})
}
@@ -151,7 +161,7 @@
func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
return n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
- Group: metadataIdentifier.Group,
+ Group: n.group,
})
}
@@ -288,6 +298,37 @@
return set, nil
}
+// GetConfigKeysByGroup will return all keys with the group
+func (n *nacosMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ group = n.resolvedGroup(group)
+ page, err := n.client.Client().SearchConfig(vo.SearchConfigParam{
+ Search: "accurate",
+ Group: group,
+ PageNo: 1,
+ // actually it's impossible for user to create 9999 application under one group
+ PageSize: maxKeysNum,
+ })
+
+ result := gxset.NewSet()
+ if err != nil {
+ return result, perrors.WithMessage(err, "can not find the configClient config")
+ }
+ for _, itm := range page.PageItems {
+ result.Add(itm.DataId)
+ }
+ return result, nil
+}
+
+// resolvedGroup will regular the group. Now, it will replace the '/' with '-'.
+// '/' is a special character for nacos
+func (n *nacosMetadataReport) resolvedGroup(group string) string {
+ if len(group) <= 0 {
+ group = metadata.DefaultGroup
+ return group
+ }
+ return strings.ReplaceAll(group, "/", "-")
+}
+
// RemoveServiceAppMappingListener remove the serviceMapping listener from metadata center
func (n *nacosMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
return n.removeServiceMappingListener(key, group)
@@ -299,7 +340,8 @@
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
url.SetParam(constant.NacosNamespaceID, url.GetParam(constant.MetadataReportNamespaceKey, ""))
url.SetParam(constant.TimeoutKey, url.GetParam(constant.TimeoutKey, constant.DefaultRegTimeout))
- url.SetParam(constant.NacosGroupKey, url.GetParam(constant.MetadataReportGroupKey, constant.ServiceDiscoveryDefaultGroup))
+ group := url.GetParam(constant.MetadataReportGroupKey, constant.ServiceDiscoveryDefaultGroup)
+ url.SetParam(constant.NacosGroupKey, group)
url.SetParam(constant.NacosUsername, url.Username)
url.SetParam(constant.NacosPassword, url.Password)
client, err := nacos.NewNacosConfigClientByUrl(url)
@@ -307,5 +349,5 @@
logger.Errorf("Could not create nacos metadata report. URL: %s", url.String())
return nil
}
- return &nacosMetadataReport{client: client}
+ return &nacosMetadataReport{client: client, group: group}
}
diff --git a/metadata/report/report.go b/metadata/report/report.go
index 0995d17..9688825 100644
--- a/metadata/report/report.go
+++ b/metadata/report/report.go
@@ -76,4 +76,6 @@
// RemoveServiceAppMappingListener remove the serviceMapping listener by key and group
RemoveServiceAppMappingListener(string, string) error
+
+ GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
}
diff --git a/metadata/report/zookeeper/listener.go b/metadata/report/zookeeper/listener.go
new file mode 100644
index 0000000..1dca921
--- /dev/null
+++ b/metadata/report/zookeeper/listener.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "strings"
+ "sync"
+)
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
+)
+
+// CacheListener defines keyListeners and rootPath
+type CacheListener struct {
+ // key is zkNode Path and value is set of listeners
+ keyListeners sync.Map
+ zkEventListener *zookeeper.ZkEventListener
+ rootPath string
+}
+
+// NewCacheListener creates a new CacheListener
+func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
+ return &CacheListener{zkEventListener: listener, rootPath: rootPath}
+}
+
+// AddListener will add a listener if loaded
+func (l *CacheListener) AddListener(key string, listener registry.MappingListener) {
+ // FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
+ _, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
+ // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
+ // make a map[your type]struct{} like set in java
+ if err != nil {
+ return
+ }
+ listeners, loaded := l.keyListeners.LoadOrStore(key, map[registry.MappingListener]struct{}{listener: {}})
+ if loaded {
+ listeners.(map[registry.MappingListener]struct{})[listener] = struct{}{}
+ l.keyListeners.Store(key, listeners)
+ }
+}
+
+// RemoveListener will delete a listener if loaded
+func (l *CacheListener) RemoveListener(key string, listener registry.MappingListener) {
+ listeners, loaded := l.keyListeners.Load(key)
+ if loaded {
+ delete(listeners.(map[registry.MappingListener]struct{}), listener)
+ }
+}
+
+// DataChange changes all listeners' event
+func (l *CacheListener) DataChange(event remoting.Event) bool {
+ if listeners, ok := l.keyListeners.Load(event.Path); ok {
+ for listener := range listeners.(map[registry.MappingListener]struct{}) {
+ appNames := strings.Split(event.Content, constant.CommaSeparator)
+ set := gxset.NewSet()
+ for _, e := range appNames {
+ set.Add(e)
+ }
+ err := listener.OnEvent(registry.NewServiceMappingChangedEvent(l.pathToKey(event.Path), set))
+ if err != nil {
+ logger.Error("Error notify mapping change event.", err)
+ return false
+ }
+ }
+ return true
+ }
+ return false
+}
+
+func (l *CacheListener) pathToKey(path string) string {
+ if len(path) == 0 {
+ return path
+ }
+ groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
+ return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
+}
diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go
index 51826ff..93dde1b 100644
--- a/metadata/report/zookeeper/report.go
+++ b/metadata/report/zookeeper/report.go
@@ -37,9 +37,11 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
+ "dubbo.apache.org/dubbo-go/v3/metadata/mapping/metadata"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)
var emptyStrSlice = make([]string, 0)
@@ -54,8 +56,10 @@
// zookeeperMetadataReport is the implementation of
// MetadataReport based on zookeeper.
type zookeeperMetadataReport struct {
- client *gxzookeeper.ZookeeperClient
- rootDir string
+ client *gxzookeeper.ZookeeperClient
+ rootDir string
+ listener *zookeeper.ZkEventListener
+ cacheListener *CacheListener
}
// GetAppMetadata get metadata info from zookeeper
@@ -157,7 +161,7 @@
// RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name
func (m *zookeeperMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error {
- path := m.rootDir + group + constant.PathSeparator + key
+ path := m.getPath(key, group)
v, state, err := m.client.GetContent(path)
if err == zk.ErrNoNode {
return m.client.CreateWithValue(path, []byte(value))
@@ -176,6 +180,12 @@
// GetServiceAppMapping get the app names from the specified Dubbo service interface
func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group string, listener registry.MappingListener) (*gxset.HashSet, error) {
path := m.rootDir + group + constant.PathSeparator + key
+
+ // listen to mapping changes first
+ if listener != nil {
+ m.cacheListener.AddListener(path, listener)
+ }
+
v, _, err := m.client.GetContent(path)
if err != nil {
return nil, err
@@ -188,10 +198,42 @@
return set, nil
}
+// GetConfigKeysByGroup will return all keys with the group
+func (m *zookeeperMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ path := m.getPath("", group)
+ result, err := m.client.GetChildren(path)
+ if err != nil {
+ return nil, perrors.WithStack(err)
+ }
+
+ if len(result) == 0 {
+ return nil, perrors.New("could not find keys with group: " + group)
+ }
+ set := gxset.NewSet()
+ for _, e := range result {
+ set.Add(e)
+ }
+ return set, nil
+}
+
func (m *zookeeperMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
return nil
}
+func (m *zookeeperMetadataReport) getPath(key string, group string) string {
+ if len(key) == 0 {
+ return m.buildPath(group)
+ }
+ return m.buildPath(group) + constant.PathSeparator + key
+}
+
+func (m *zookeeperMetadataReport) buildPath(group string) string {
+ if len(group) == 0 {
+ group = metadata.DefaultGroup
+ }
+ return m.rootDir + group
+}
+
type zookeeperMetadataReportFactory struct{}
// nolint
@@ -214,5 +256,13 @@
rootDir = rootDir + constant.PathSeparator
}
- return &zookeeperMetadataReport{client: client, rootDir: rootDir}
+ reporter := &zookeeperMetadataReport{
+ client: client,
+ rootDir: rootDir,
+ listener: zookeeper.NewZkEventListener(client),
+ }
+
+ reporter.cacheListener = NewCacheListener(rootDir, reporter.listener)
+ reporter.listener.ListenConfigurationEvent(rootDir+constant.PathSeparator+metadata.DefaultGroup, reporter.cacheListener)
+ return reporter
}
diff --git a/metadata/service/local/metadata_service_proxy_factory.go b/metadata/service/local/metadata_service_proxy_factory.go
index 417cc4c..b914778 100644
--- a/metadata/service/local/metadata_service_proxy_factory.go
+++ b/metadata/service/local/metadata_service_proxy_factory.go
@@ -68,7 +68,7 @@
p := extension.GetProtocol(u.Protocol)
invoker := p.Refer(u)
return &MetadataServiceProxy{
- invkr: invoker,
+ Invoker: invoker,
}
}
diff --git a/metadata/service/local/service_proxy.go b/metadata/service/local/service_proxy.go
index 711058d..47421b1 100644
--- a/metadata/service/local/service_proxy.go
+++ b/metadata/service/local/service_proxy.go
@@ -40,7 +40,7 @@
// GetMetadataInfo need to be implemented.
// TODO use ProxyFactory to create proxy
type MetadataServiceProxy struct {
- invkr protocol.Invoker
+ Invoker protocol.Invoker
}
// nolint
@@ -58,7 +58,7 @@
invocation.WithAttachments(map[string]interface{}{constant.AsyncKey: "false"}),
invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV}))
- res := m.invkr.Invoke(context.Background(), inv)
+ res := m.Invoker.Invoke(context.Background(), inv)
if res.Error() != nil {
logger.Errorf("could not get the metadata service from remote provider: %v", res.Error())
return []*common.URL{}, nil
@@ -180,7 +180,7 @@
invocation.WithReply(reflect.ValueOf(&common.MetadataInfo{}).Interface()),
invocation.WithAttachments(map[string]interface{}{constant.AsyncKey: "false"}),
invocation.WithParameterValues([]reflect.Value{rV}))
- res := m.invkr.Invoke(context.Background(), inv)
+ res := m.Invoker.Invoke(context.Background(), inv)
if res.Error() != nil {
logger.Errorf("could not get the metadata info from remote provider: %v", res.Error())
return nil, res.Error()
diff --git a/metadata/service/local_service.go b/metadata/service/local_service.go
index 909af3c..1251a9d 100644
--- a/metadata/service/local_service.go
+++ b/metadata/service/local_service.go
@@ -116,7 +116,7 @@
}
func (b *BaseMetadataServiceProxyFactory) GetProxy(ins registry.ServiceInstance) MetadataService {
- return b.creator(ins).(MetadataService)
+ return b.creator(ins)
}
func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string {
diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go
index 4991a33..10f2b1e 100644
--- a/metadata/service/remote/service_test.go
+++ b/metadata/service/remote/service_test.go
@@ -58,6 +58,11 @@
type metadataReport struct{}
+func (mr metadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
func (mr metadataReport) RegisterServiceAppMapping(string, string, string) error {
panic("implement me")
}
diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go
index 4470cb2..2b3e9a7 100644
--- a/protocol/jsonrpc/server.go
+++ b/protocol/jsonrpc/server.go
@@ -57,6 +57,8 @@
DefaultHTTPRspBufferSize = 1024
// PathPrefix ...
PathPrefix = byte('/')
+ // Max HTTP header size in Mib
+ MaxHeaderSize = 8 * 1024 * 1024
)
// Server is JSON RPC server wrapper
@@ -121,7 +123,7 @@
}
for {
- bufReader := bufio.NewReader(conn)
+ bufReader := bufio.NewReader(io.LimitReader(conn, MaxHeaderSize))
r, err := http.ReadRequest(bufReader)
if err != nil {
logger.Warnf("[ReadRequest] error: %v", err)
diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go
index 88f1867..9c13c6f 100644
--- a/registry/base_configuration_listener.go
+++ b/registry/base_configuration_listener.go
@@ -55,7 +55,7 @@
}
bcl.defaultConfiguratorFunc = f
bcl.dynamicConfiguration.AddListener(key, listener)
- if rawConfig, err := bcl.dynamicConfiguration.GetInternalProperty(key,
+ if rawConfig, err := bcl.dynamicConfiguration.GetRule(key,
config_center.WithGroup(constant.Dubbo)); err != nil {
//set configurators to empty
bcl.configurators = []config_center.Configurator{}
@@ -69,7 +69,7 @@
// Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
- logger.Debugf("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
+ logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel {
bcl.configurators = nil
} else {
diff --git a/registry/base_registry.go b/registry/base_registry.go
index a41e967..8436865 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -133,16 +133,7 @@
// Register implement interface registry to register
func (r *BaseRegistry) Register(url *common.URL) error {
- // if developer define registry port and ip, use it first.
start := time.Now()
- if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 {
- url.Ip = ipToRegistry
- } else {
- url.Ip = common.GetLocalIp()
- }
- if portToRegistry := os.Getenv(constant.DubboPortToRegistryKey); len(portToRegistry) > 0 {
- url.Port = portToRegistry
- }
// todo bug when provider、consumer simultaneous initialization
if _, ok := r.registered.Load(url.Key()); ok {
return perrors.Errorf("Service {%s} has been registered", url.Key())
@@ -348,9 +339,7 @@
// Subscribe :subscribe from registry, event will notify by notifyListener
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
- n := 0
for {
- n++
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("BaseRegistry is not available.")
@@ -371,13 +360,12 @@
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
- break
+ return nil
} else {
- logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
+ logger.Debugf("[Registry] update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
- sleepWait(n)
}
}
@@ -398,16 +386,10 @@
return perrors.WithStack(err)
}
- for {
- if serviceEvent, err := listener.Next(); err != nil {
- logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
- listener.Close()
- break
- } else {
- logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
- notifyListener.Notify(serviceEvent)
- }
+ if listener != nil {
+ listener.Close()
}
+
return nil
}
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 903fbdb..27cab62 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -52,12 +52,14 @@
func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
+ extension.SetDirectory(constant.RegistryProtocol, NewRegistryDirectory)
+ extension.SetDirectory(constant.ServiceRegistryProtocol, NewServiceDiscoveryRegistryDirectory)
}
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
- base.Directory
+ *base.Directory
cacheInvokers []protocol.Invoker
invokersLock sync.RWMutex
serviceType string
@@ -69,6 +71,8 @@
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
registerLock sync.Mutex // this lock if for register
+ SubscribedUrl *common.URL
+ RegisteredUrl *common.URL
}
// NewRegistryDirectory will create a new RegistryDirectory
@@ -105,11 +109,24 @@
}
// subscribe from registry
-func (dir *RegistryDirectory) Subscribe(url *common.URL) {
- logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
- if err := dir.registry.Subscribe(url, dir); err != nil {
- logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
- }
+func (dir *RegistryDirectory) Subscribe(url *common.URL) error {
+ logger.Infof("Start subscribing for service :%s with a new go routine.", url.Key())
+
+ go func() {
+ dir.SubscribedUrl = url
+ if err := dir.registry.Subscribe(url, dir); err != nil {
+ logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
+ }
+
+ urlToReg := getConsumerUrlToRegistry(url)
+ err := dir.registry.Register(urlToReg)
+ if err != nil {
+ logger.Errorf("consumer service %v register registry %v error, error message is %s",
+ url.String(), dir.registry.GetURL().String(), err.Error())
+ }
+ }()
+
+ return nil
}
// Notify monitor changes from registry,and update the cacheServices
@@ -428,8 +445,8 @@
// IsAvailable whether the directory is available
func (dir *RegistryDirectory) IsAvailable() bool {
- if !dir.Directory.IsAvailable() {
- return dir.Directory.IsAvailable()
+ if dir.Directory.IsDestroyed() {
+ return false
}
for _, ivk := range dir.cacheInvokers {
@@ -444,7 +461,21 @@
// Destroy method
func (dir *RegistryDirectory) Destroy() {
// TODO:unregister & unsubscribe
- dir.Directory.Destroy(func() {
+ dir.Directory.DoDestroy(func() {
+ if dir.RegisteredUrl != nil {
+ err := dir.registry.UnRegister(dir.RegisteredUrl)
+ if err != nil {
+ logger.Warnf("Unregister consumer url failed, %s", dir.RegisteredUrl.String(), err)
+ }
+ }
+
+ if dir.SubscribedUrl != nil {
+ err := dir.registry.UnSubscribe(dir.SubscribedUrl, dir)
+ if err != nil {
+ logger.Warnf("Unsubscribe consumer url failed, %s", dir.RegisteredUrl.String(), err)
+ }
+ }
+
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
@@ -493,7 +524,7 @@
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
- url.EncodedServiceKey()+constant.ConfiguratorSuffix,
+ url.ColonSeparatedKey()+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
@@ -534,3 +565,55 @@
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers(nil)
}
+
+// ServiceDiscoveryRegistryDirectory implementation of Directory:
+// Invoker list returned from this Directory's list method have been filtered by Routers
+type ServiceDiscoveryRegistryDirectory struct {
+ *base.Directory
+ *RegistryDirectory
+}
+
+// NewServiceDiscoveryRegistryDirectory will create a new ServiceDiscoveryRegistryDirectory
+func NewServiceDiscoveryRegistryDirectory(url *common.URL, registry registry.Registry) (directory.Directory, error) {
+ dic, err := NewRegistryDirectory(url, registry)
+ registryDirectory, _ := dic.(*RegistryDirectory)
+ return &ServiceDiscoveryRegistryDirectory{
+ Directory: registryDirectory.Directory,
+ RegistryDirectory: registryDirectory,
+ }, err
+}
+
+// Subscribe do subscribe from registry
+func (dir *ServiceDiscoveryRegistryDirectory) Subscribe(url *common.URL) error {
+ if err := dir.registry.Subscribe(url, dir); err != nil {
+ logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
+ return err
+ }
+
+ urlToReg := getConsumerUrlToRegistry(url)
+ err := dir.RegistryDirectory.registry.Register(urlToReg)
+ if err != nil {
+ logger.Errorf("consumer service %v register registry %v error, error message is %s",
+ url.String(), dir.registry.GetURL().String(), err.Error())
+ return err
+ }
+ return nil
+}
+
+// List selected protocol invokers from the directory
+func (dir *ServiceDiscoveryRegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
+ return dir.RegistryDirectory.List(invocation)
+}
+
+func getConsumerUrlToRegistry(url *common.URL) *common.URL {
+ // if developer define registry port and ip, use it first.
+ if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 {
+ url.Ip = ipToRegistry
+ } else {
+ url.Ip = common.GetLocalIp()
+ }
+ if portToRegistry := os.Getenv(constant.DubboPortToRegistryKey); len(portToRegistry) > 0 {
+ url.Port = portToRegistry
+ }
+ return url
+}
diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go
index 7944dd1..81fb233 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -95,7 +95,7 @@
hs := gxset.NewSet()
hs.Add(testName)
- sicl := servicediscovery.NewServiceInstancesChangedListener(hs)
+ sicl := servicediscovery.NewServiceInstancesChangedListener("test_app", hs)
sicl.AddListenerAndNotify(testName, tn)
err = sd.AddListener(sicl)
assert.NoError(t, err)
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index bc5a239..bba3615 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -41,7 +41,6 @@
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/registry/directory"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -148,21 +147,16 @@
reg := proto.getRegistry(url)
// new registry directory for store service url from registry
- dic, err := extension.GetDefaultRegistryDirectory(registryUrl, reg)
+ dic, err := extension.GetDirectoryInstance(registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
- // TODO, refactor to avoid type conversion
- regDic, ok := dic.(*directory.RegistryDirectory)
- if !ok {
- logger.Errorf("Directory %v is expected to implement Directory, and will return nil invoker!", dic)
- return nil
- }
- go regDic.Subscribe(registryUrl.SubURL)
- err = reg.Register(serviceUrl)
+ // This will start a new routine and listen to instance changes.
+ err = dic.Subscribe(registryUrl.SubURL)
+
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
@@ -544,7 +538,7 @@
func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener {
listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl}
listener.InitWith(
- providerUrl.EncodedServiceKey()+constant.ConfiguratorSuffix,
+ providerUrl.ColonSeparatedKey()+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go
index 2c9ff61..4bc555b 100644
--- a/registry/protocol/protocol_test.go
+++ b/registry/protocol/protocol_test.go
@@ -40,6 +40,7 @@
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/registry/directory"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -55,6 +56,7 @@
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
extension.SetCluster("mock", cluster.NewMockCluster)
+ extension.SetDirectory("mock", directory.NewRegistryDirectory)
url, _ := common.NewURL("mock://127.0.0.1:1111")
suburl, _ := common.NewURL(
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index db0986d..6618544 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -46,7 +46,6 @@
"dubbo.apache.org/dubbo-go/v3/registry"
_ "dubbo.apache.org/dubbo-go/v3/registry/event"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer"
- "dubbo.apache.org/dubbo-go/v3/remoting"
)
func init() {
@@ -206,6 +205,7 @@
func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
if !shouldSubscribe(url) {
+ logger.Infof("Service %s is set to not subscribe to instances.", url.ServiceKey())
return nil
}
_, err := s.metaDataService.SubscribeURL(url)
@@ -219,6 +219,7 @@
return perrors.Errorf("Should has at least one way to know which services this interface belongs to,"+
" either specify 'provided-by' for reference or enable metadata-report center subscription url:%s", url.String())
}
+ logger.Infof("Find initial mapping applications %q for service %s.", services, url.ServiceKey())
// first notify
mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(), services))
return nil
@@ -235,11 +236,11 @@
protocolServiceKey := url.ServiceKey() + ":" + protocol
listener := s.serviceListeners[serviceNamesKey]
if listener == nil {
- listener = NewServiceInstancesChangedListener(services)
+ listener = NewServiceInstancesChangedListener(url.GetParam(constant.ApplicationKey, ""), services)
for _, serviceNameTmp := range services.Values() {
serviceName := serviceNameTmp.(string)
instances := s.serviceDiscovery.GetInstances(serviceName)
- logger.Infof("Synchronized instance notification on subscription, instance list size %s", len(instances))
+ logger.Infof("Synchronized instance notification on application %s subscription, instance list size %s", serviceName, len(instances))
err = listener.OnEvent(®istry.ServiceInstancesChangedEvent{
ServiceName: serviceName,
Instances: instances,
@@ -252,46 +253,23 @@
s.serviceListeners[serviceNamesKey] = listener
listener.AddListenerAndNotify(protocolServiceKey, notify)
event := metricMetadata.NewMetadataMetricTimeEvent(metricMetadata.SubscribeServiceRt)
- err = s.serviceDiscovery.AddListener(listener)
- event.Succ = err != nil
- event.End = time.Now()
- event.Attachment[constant.InterfaceKey] = url.Interface()
- metrics.Publish(event)
- metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil))
- if err != nil {
- logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error())
- }
+
+ logger.Infof("Start subscribing to registry for applications :%s with a new go routine.", serviceNamesKey)
+ go func() {
+ err = s.serviceDiscovery.AddListener(listener)
+ event.Succ = err != nil
+ event.End = time.Now()
+ event.Attachment[constant.InterfaceKey] = url.Interface()
+ metrics.Publish(event)
+ metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil))
+ if err != nil {
+ logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error())
+ }
+ }()
}
// LoadSubscribeInstances load subscribe instance
func (s *ServiceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
- appName := url.GetParam(constant.ApplicationKey, url.Username)
- instances := s.serviceDiscovery.GetInstances(appName)
- for _, instance := range instances {
- if instance.GetMetadata() == nil {
- logger.Warnf("Instance metadata is nil: %s", instance.GetHost())
- continue
- }
- revision, ok := instance.GetMetadata()[constant.ExportedServicesRevisionPropertyName]
- if !ok {
- logger.Warnf("Instance metadata revision is nil: %s", instance.GetHost())
- continue
- }
- if "0" == revision {
- logger.Infof("Find instance without valid service metadata: %s", instance.GetHost())
- continue
- }
- metadataInfo, err := GetMetadataInfo(instance, revision)
- if err != nil {
- return err
- }
- instance.SetServiceMetadata(metadataInfo)
- for _, serviceInfo := range metadataInfo.Services {
- for _, url := range instance.ToURLs(serviceInfo) {
- notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: url})
- }
- }
- }
return nil
}
diff --git a/registry/servicediscovery/service_instances_changed_listener_impl.go b/registry/servicediscovery/service_instances_changed_listener_impl.go
index de2667b..f5123ce 100644
--- a/registry/servicediscovery/service_instances_changed_listener_impl.go
+++ b/registry/servicediscovery/service_instances_changed_listener_impl.go
@@ -34,7 +34,8 @@
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/store"
"dubbo.apache.org/dubbo-go/v3/remoting"
@@ -45,9 +46,9 @@
cacheOnce sync.Once
)
-func initCache() {
+func initCache(app string) {
gob.Register(&common.MetadataInfo{})
- fileName := constant.DefaultMetaFileName + config.GetApplicationConfig().Name
+ fileName := constant.DefaultMetaFileName + app
cache, err := store.NewCacheManager(constant.DefaultMetaCacheName, fileName, time.Minute*10, constant.DefaultEntrySize, true)
if err != nil {
logger.Fatal("Failed to create cache [%s],the err is %v", constant.DefaultMetaCacheName, err)
@@ -57,16 +58,21 @@
// ServiceInstancesChangedListenerImpl The Service Discovery Changed Event Listener
type ServiceInstancesChangedListenerImpl struct {
+ app string
serviceNames *gxset.HashSet
listeners map[string]registry.NotifyListener
serviceUrls map[string][]*common.URL
revisionToMetadata map[string]*common.MetadataInfo
allInstances map[string][]registry.ServiceInstance
+ mutex sync.Mutex
}
-func NewServiceInstancesChangedListener(services *gxset.HashSet) registry.ServiceInstancesChangedListener {
- cacheOnce.Do(initCache)
+func NewServiceInstancesChangedListener(app string, services *gxset.HashSet) registry.ServiceInstancesChangedListener {
+ cacheOnce.Do(func() {
+ initCache(app)
+ })
return &ServiceInstancesChangedListenerImpl{
+ app: app,
serviceNames: services,
listeners: make(map[string]registry.NotifyListener),
serviceUrls: make(map[string][]*common.URL),
@@ -82,6 +88,10 @@
return nil
}
var err error
+
+ lstn.mutex.Lock()
+ defer lstn.mutex.Unlock()
+
lstn.allInstances[ce.ServiceName] = ce.Instances
revisionToInstances := make(map[string][]registry.ServiceInstance)
newRevisionToMetadata := make(map[string]*common.MetadataInfo)
@@ -112,7 +122,7 @@
if val, ok := metaCache.Get(revision); ok {
metadataInfo = val.(*common.MetadataInfo)
} else {
- metadataInfo, err = GetMetadataInfo(instance, revision)
+ metadataInfo, err = GetMetadataInfo(lstn.app, instance, revision)
if err != nil {
return err
}
@@ -215,8 +225,10 @@
}
// GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is null
-func GetMetadataInfo(instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) {
- cacheOnce.Do(initCache)
+func GetMetadataInfo(app string, instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) {
+ cacheOnce.Do(func() {
+ initCache(app)
+ })
if metadataInfo, ok := metaCache.Get(revision); ok {
return metadataInfo.(*common.MetadataInfo), nil
}
@@ -241,6 +253,7 @@
var err error
proxyFactory := extension.GetMetadataServiceProxyFactory(constant.DefaultKey)
metadataService := proxyFactory.GetProxy(instance)
+ defer destroyInvoker(metadataService)
metadataInfo, err = metadataService.GetMetadataInfo(revision)
if err != nil {
return nil, err
@@ -251,3 +264,16 @@
return metadataInfo, nil
}
+
+func destroyInvoker(metadataService service.MetadataService) {
+ if metadataService == nil {
+ return
+ }
+
+ proxy := metadataService.(*local.MetadataServiceProxy)
+ if proxy.Invoker == nil {
+ return
+ }
+
+ proxy.Invoker.Destroy()
+}
diff --git a/registry/servicediscovery/service_mapping_change_listener_impl.go b/registry/servicediscovery/service_mapping_change_listener_impl.go
index 5450d81..2f64f69 100644
--- a/registry/servicediscovery/service_mapping_change_listener_impl.go
+++ b/registry/servicediscovery/service_mapping_change_listener_impl.go
@@ -24,6 +24,7 @@
import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/gof/observer"
+ "github.com/dubbogo/gost/log/logger"
)
import (
@@ -39,6 +40,8 @@
serviceUrl *common.URL
mappingCache *sync.Map
stop int
+
+ mux sync.Mutex
}
const (
@@ -63,6 +66,10 @@
err error
reg registry.Registry
)
+
+ lstn.mux.Lock()
+ defer lstn.mux.Unlock()
+
if lstn.stop == ServiceMappingListenerStop {
return nil
}
@@ -88,6 +95,7 @@
}
for _, service := range newServiceNames.Values() {
if !oldServiceNames.Contains(service) {
+ logger.Infof("Service-application mapping changed for service: %s, new applications: %q, old applications: %q.", lstn.serviceUrl.ServiceKey(), oldServiceNames, newServiceNames)
lstn.mappingCache.Delete(oldServiceNames.String())
lstn.mappingCache.Store(newServiceNames.String(), newServiceNames)
if reg, err = extension.GetRegistry(lstn.registryUrl.Protocol, lstn.registryUrl); err != nil {
@@ -97,8 +105,10 @@
sdreg.SubscribeURL(lstn.serviceUrl, lstn.listener, newServiceNames)
}
lstn.oldServiceNames = newServiceNames
+ break
}
}
+
return err
}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index c8f0f26..ee8e56d 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -153,7 +153,7 @@
for {
select {
case <-l.close:
- return nil, perrors.New("listener have been closed")
+ return nil, perrors.New("listener has been closed")
case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index b56c394..3b26751 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -300,26 +300,24 @@
zkListener, _ = configurationListener.(*RegistryConfigurationListener)
if zkListener != nil && zkListener.isClosed {
r.dataListener.mutex.Unlock()
- return nil, perrors.New("configListener already been closed")
+ return nil, perrors.New(fmt.Sprintf("configListener for service %s has already been closed", conf.ServiceKey()))
}
}
- zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener)
+ if configurationListener := r.dataListener.UnSubscribeURL(conf); configurationListener != nil {
+ switch v := configurationListener.(type) {
+ case (*RegistryConfigurationListener):
+ if v != nil {
+ zkListener = v
+ }
+ }
+ }
r.dataListener.mutex.Unlock()
if r.listener == nil {
- return nil, perrors.New("listener is null can not close.")
+ return nil, perrors.New("Zookeeper event listener is null, can not close.")
}
- // Interested register to dataconfig.
- r.listenerLock.Lock()
- listener := r.listener
- r.listener = nil
- r.listenerLock.Unlock()
-
- r.dataListener.Close()
- listener.Close()
-
return zkListener, nil
}
diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go
index ff19922..efecef7 100644
--- a/remoting/nacos/builder.go
+++ b/remoting/nacos/builder.go
@@ -103,7 +103,7 @@
NotLoadCacheAtStart: url.GetParamBool(constant.NacosNotLoadLocalCache, true),
LogDir: url.GetParam(constant.NacosLogDirKey, ""),
LogLevel: url.GetParam(constant.NacosLogLevelKey, "info"),
- UpdateCacheWhenEmpty: url.GetParamBool(constant.NacosUpdateCacheWhenEmpty, false),
+ UpdateCacheWhenEmpty: url.GetParamBool(constant.NacosUpdateCacheWhenEmpty, true),
}
return serverConfigs, clientConfig, nil
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 673e011..c2b2144 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -69,10 +69,10 @@
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
- l.pathMapLock.Lock()
- delete(l.pathMap, zkPath)
- l.pathMapLock.Unlock()
}
+ l.pathMapLock.Lock()
+ delete(l.pathMap, zkPath)
+ l.pathMapLock.Unlock()
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
@@ -130,7 +130,6 @@
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
-
l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
@@ -220,10 +219,10 @@
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
- l.pathMapLock.Lock()
- delete(l.pathMap, zkPath)
- l.pathMapLock.Unlock()
}
+ l.pathMapLock.Lock()
+ delete(l.pathMap, zkPath)
+ l.pathMapLock.Unlock()
logger.Debugf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", node)
}(newNode, listener)
}
@@ -379,6 +378,17 @@
l.pathMapLock.Unlock()
if ok {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath)
+ l.Client.RLock()
+ if l.Client.Conn == nil {
+ l.Client.RUnlock()
+ break
+ }
+ content, _, err := l.Client.Conn.Get(zkNodePath)
+ l.Client.RUnlock()
+ if err != nil {
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
+ }
+ listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)})
continue
}
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
@@ -402,10 +412,10 @@
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
- l.pathMapLock.Lock()
- delete(l.pathMap, zkPath)
- l.pathMapLock.Unlock()
}
+ l.pathMapLock.Lock()
+ delete(l.pathMap, zkPath)
+ l.pathMapLock.Unlock()
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkNodePath, listener)
}