fix config change event error
diff --git a/config/config_loader.go b/config/config_loader.go
index 3d9009e..44a0601 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -182,7 +182,7 @@
if data, err := yaml.MarshalYML(consumerConfig); err != nil {
logger.Errorf("Marshal consumer config err: %s", err.Error())
} else {
- if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0o666); err != nil {
+ if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write consumer config cache file err: %s", err.Error())
}
}
@@ -249,7 +249,7 @@
if data, err := yaml.MarshalYML(providerConfig); err != nil {
logger.Errorf("Marshal provider config err: %s", err.Error())
} else {
- if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0o666); err != nil {
+ if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write provider config cache file err: %s", err.Error())
}
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index f140aff..7342163 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -25,6 +25,7 @@
import (
gxset "github.com/dubbogo/gost/container/set"
+ perrors "github.com/pkg/errors"
)
import (
@@ -70,7 +71,8 @@
extension.SetProtocol("registry", GetProtocol)
}
-func getCacheKey(url *common.URL) string {
+func getCacheKey(invoker protocol.Invoker) string {
+ url := getProviderUrl(invoker)
delKeys := gxset.NewSet("dynamic", "enabled")
return url.CloneExceptParams(delKeys).String()
}
@@ -200,7 +202,7 @@
return nil
}
- key := getCacheKey(providerUrl)
+ key := getCacheKey(invoker)
logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key)
if loaded {
@@ -221,17 +223,45 @@
}
func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
- url := getProviderUrl(invoker)
- key := getCacheKey(url)
+ key := getCacheKey(invoker)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
wrappedNewInvoker := newWrappedInvoker(invoker, newUrl)
oldExporter.(protocol.Exporter).Unexport()
proto.bounds.Delete(key)
+ // oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible
+ if err := registerServiceMap(invoker); err != nil {
+ logger.Error(err.Error())
+ }
proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe
}
}
+func registerServiceMap(invoker protocol.Invoker) error {
+ providerUrl := getProviderUrl(invoker)
+ id := providerUrl.GetParam(constant.BEAN_NAME_KEY, "")
+
+ serviceConfig := config.GetProviderConfig().Services[id]
+ if serviceConfig == nil {
+ s := "reExport can not get serviceConfig"
+ return perrors.New(s)
+ }
+ rpcService := config.GetProviderService(id)
+ if rpcService == nil {
+ s := "reExport can not get RPCService"
+ return perrors.New(s)
+ }
+
+ _, err := common.ServiceMap.Register(serviceConfig.InterfaceName,
+ serviceConfig.Protocol, serviceConfig.Group,
+ serviceConfig.Version, rpcService)
+ if err != nil {
+ s := "reExport can not re register ServiceMap. Error message is " + err.Error()
+ return perrors.New(s)
+ }
+ return nil
+}
+
type overrideSubscribeListener struct {
url *common.URL
originInvoker protocol.Invoker
@@ -263,7 +293,7 @@
func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
- key := getCacheKey(providerUrl)
+ key := getCacheKey(nl.originInvoker)
if exporter, ok := nl.protocol.bounds.Load(key); ok {
currentUrl := exporter.(protocol.Exporter).GetInvoker().GetURL()
// Compatible with the 2.6.x
diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go
index bf8e4a9..a99dc8d 100644
--- a/registry/protocol/protocol_test.go
+++ b/registry/protocol/protocol_test.go
@@ -23,6 +23,7 @@
)
import (
+ gxset "github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)
@@ -245,7 +246,9 @@
time.Sleep(1e9)
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
- v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
+ delKeys := gxset.NewSet("dynamic", "enabled")
+ key := newUrl.CloneExceptParams(delKeys).String()
+ v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}
@@ -265,7 +268,10 @@
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
- v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
+ delKeys := gxset.NewSet("dynamic", "enabled")
+ key := newUrl.CloneExceptParams(delKeys).String()
+ v2, _ := regProtocol.bounds.Load(key)
+
assert.NotNil(t, v2)
}
@@ -284,7 +290,9 @@
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
- v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
+ delKeys := gxset.NewSet("dynamic", "enabled")
+ key := newUrl.CloneExceptParams(delKeys).String()
+ v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}