Merge remote-tracking branch 'origin/3.0' into 3.0
# Conflicts:
# registry/zookeeper/service_discovery_test.go
diff --git a/NOTICE b/NOTICE
index e0f4af6..003eda8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache Dubbo-go
-Copyright 2018-2021 The Apache Software Foundation
+Copyright 2018-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index bb92459..17e86c5 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@
## RPC Invocation
-![](https://dubbogo.github.io/img/dubbogo-3.0-invocation.png)
+![](https://dubbo-go-pixiu.github.io/img/pixiu-dubbo-ecosystem.png)
Dubbo-go has supported many RPC protocol, like Triple, Dubbo JSONRPC, gRPC, HTTP, HTTP2.
diff --git a/README_CN.md b/README_CN.md
index 217e5e4..c088a59 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -14,7 +14,7 @@
## RPC 调用
-![](https://dubbogo.github.io/img/dubbogo-3.0-invocation.png)
+![](https://dubbo-go-pixiu.github.io/img/pixiu-dubbo-ecosystem.png)
Dubbo-go 生态覆盖多种网络协议:Triple、Dubbo、JSONRPC、gRPC、HTTP、HTTP2 等。
diff --git a/cluster/cluster/available/cluster_invoker_test.go b/cluster/cluster/available/cluster_invoker_test.go
index 70919e6..cf20230 100644
--- a/cluster/cluster/available/cluster_invoker_test.go
+++ b/cluster/cluster/available/cluster_invoker_test.go
@@ -19,6 +19,7 @@
import (
"context"
+ "errors"
"fmt"
"strings"
"testing"
@@ -51,7 +52,8 @@
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
- invoker.EXPECT().GetUrl().Return(availableUrl)
+ invoker.EXPECT().GetUrl().Return(availableUrl).AnyTimes()
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
staticDir := static.NewDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
@@ -66,8 +68,8 @@
clusterInvoker := registerAvailable(invoker)
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
- invoker.EXPECT().IsAvailable().Return(true)
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
@@ -81,7 +83,10 @@
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)
- invoker.EXPECT().IsAvailable().Return(false)
+ invoker.EXPECT().IsAvailable().Return(false).AnyTimes()
+
+ res := &protocol.RPCResult{Err: errors.New("no provider available")}
+ invoker.EXPECT().Invoke(gomock.Any()).Return(res).AnyTimes()
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
diff --git a/cluster/cluster/broadcast/cluster_invoker_test.go b/cluster/cluster/broadcast/cluster_invoker_test.go
index 74cd8cf..bd09e9e 100644
--- a/cluster/cluster/broadcast/cluster_invoker_test.go
+++ b/cluster/cluster/broadcast/cluster_invoker_test.go
@@ -49,11 +49,9 @@
extension.SetLoadbalance("random", random.NewRandomLoadBalance)
invokers := []protocol.Invoker{}
- for i, ivk := range mockInvokers {
+ for _, ivk := range mockInvokers {
invokers = append(invokers, ivk)
- if i == 0 {
- ivk.EXPECT().GetUrl().Return(broadcastUrl)
- }
+ ivk.EXPECT().GetUrl().Return(broadcastUrl).AnyTimes()
}
staticDir := static.NewDirectory(invokers)
@@ -72,7 +70,7 @@
for i := 0; i < 3; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}
clusterInvoker := registerBroadcast(invokers...)
@@ -92,17 +90,17 @@
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}
{
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes()
}
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}
clusterInvoker := registerBroadcast(invokers...)
diff --git a/cluster/cluster/failback/cluster_test.go b/cluster/cluster/failback/cluster_test.go
index 3b01e79..4bf022f 100644
--- a/cluster/cluster/failback/cluster_test.go
+++ b/cluster/cluster/failback/cluster_test.go
@@ -56,7 +56,7 @@
var invokers []protocol.Invoker
invokers = append(invokers, invoker)
- invoker.EXPECT().GetUrl().Return(failbackUrl)
+ invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
staticDir := static.NewDirectory(invokers)
clusterInvoker := failbackCluster.Join(staticDir)
@@ -73,10 +73,10 @@
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
- invoker.EXPECT().IsAvailable().Return(true)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
@@ -121,7 +121,7 @@
wg.Wait()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
- invoker.EXPECT().Destroy().Return()
+ invoker.EXPECT().Destroy().Return().AnyTimes()
clusterInvoker.Destroy()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
diff --git a/cluster/cluster/failfast/cluster_test.go b/cluster/cluster/failfast/cluster_test.go
index aa5dc6a..ed34e68 100644
--- a/cluster/cluster/failfast/cluster_test.go
+++ b/cluster/cluster/failfast/cluster_test.go
@@ -55,7 +55,7 @@
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
- invoker.EXPECT().GetUrl().Return(failfastUrl)
+ invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
staticDir := static.NewDirectory(invokers)
clusterInvoker := failfastCluster.Join(staticDir)
@@ -74,7 +74,7 @@
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
@@ -95,7 +95,7 @@
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NotNil(t, result.Error())
diff --git a/cluster/cluster/failsafe/cluster_test.go b/cluster/cluster/failsafe/cluster_test.go
index 819d8fb..31796a4 100644
--- a/cluster/cluster/failsafe/cluster_test.go
+++ b/cluster/cluster/failsafe/cluster_test.go
@@ -55,7 +55,7 @@
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
- invoker.EXPECT().GetUrl().Return(failsafeUrl)
+ invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
staticDir := static.NewDirectory(invokers)
clusterInvoker := failsafeCluster.Join(staticDir)
@@ -75,7 +75,7 @@
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
@@ -95,7 +95,7 @@
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
- invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+ invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
diff --git a/cluster/cluster/forking/cluster_test.go b/cluster/cluster/forking/cluster_test.go
index 0787300..427ba87 100644
--- a/cluster/cluster/forking/cluster_test.go
+++ b/cluster/cluster/forking/cluster_test.go
@@ -51,11 +51,9 @@
extension.SetLoadbalance(constant.LoadBalanceKeyRoundRobin, roundrobin.NewRRLoadBalance)
var invokers []protocol.Invoker
- for i, ivk := range mockInvokers {
+ for _, ivk := range mockInvokers {
invokers = append(invokers, ivk)
- if i == 0 {
- ivk.EXPECT().GetUrl().Return(forkingUrl)
- }
+ ivk.EXPECT().GetUrl().Return(forkingUrl).AnyTimes()
}
staticDir := static.NewDirectory(invokers)
@@ -145,14 +143,14 @@
func(protocol.Invocation) protocol.Result {
wg.Done()
return mockResult
- })
+ }).AnyTimes()
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(protocol.Invocation) protocol.Result {
time.Sleep(2 * time.Second)
wg.Done()
return mockResult
- })
+ }).AnyTimes()
}
}
diff --git a/cluster/cluster/zoneaware/cluster_interceptor.go b/cluster/cluster/zoneaware/cluster_interceptor.go
index c92edc7..cb988cb 100644
--- a/cluster/cluster/zoneaware/cluster_interceptor.go
+++ b/cluster/cluster/zoneaware/cluster_interceptor.go
@@ -27,8 +27,7 @@
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-type interceptor struct {
-}
+type interceptor struct{}
func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
key := constant.RegistryKey + "." + constant.RegistryZoneForceKey
diff --git a/cluster/cluster/zoneaware/cluster_invoker_test.go b/cluster/cluster/zoneaware/cluster_invoker_test.go
index a9ae7c9..db9ac41 100644
--- a/cluster/cluster/zoneaware/cluster_invoker_test.go
+++ b/cluster/cluster/zoneaware/cluster_invoker_test.go
@@ -63,7 +63,7 @@
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return mockResult
- })
+ }).AnyTimes()
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
diff --git a/cluster/loadbalance/p2c/loadbalance.go b/cluster/loadbalance/p2c/loadbalance.go
index 7a044cc..12f2a37 100644
--- a/cluster/loadbalance/p2c/loadbalance.go
+++ b/cluster/loadbalance/p2c/loadbalance.go
@@ -44,6 +44,7 @@
)
func init() {
+ rand.Seed(randSeed())
extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newP2CLoadBalance)
}
@@ -78,7 +79,6 @@
if len(invokers) == 2 {
i, j = 0, 1
} else {
- rand.Seed(randSeed())
i = rand.Intn(len(invokers))
j = i
for i == j {
diff --git a/cluster/loadbalance/p2c/loadbalance_test.go b/cluster/loadbalance/p2c/loadbalance_test.go
index 17092fb..ff27ed0 100644
--- a/cluster/loadbalance/p2c/loadbalance_test.go
+++ b/cluster/loadbalance/p2c/loadbalance_test.go
@@ -18,6 +18,7 @@
package p2c
import (
+ "math/rand"
"testing"
)
@@ -37,16 +38,18 @@
func TestLoadBalance(t *testing.T) {
lb := newP2CLoadBalance()
invocation := protoinvoc.NewRPCInvocation("TestMethod", []interface{}{}, nil)
- randSeed = func() int64 {
+ randSeed := func() int64 {
return 0
}
t.Run("no invokers", func(t *testing.T) {
+ rand.Seed(randSeed())
ivk := lb.Select([]protocol.Invoker{}, invocation)
assert.Nil(t, ivk)
})
t.Run("one invoker", func(t *testing.T) {
+ rand.Seed(randSeed())
url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
ivkArr := []protocol.Invoker{
@@ -57,6 +60,7 @@
})
t.Run("two invokers", func(t *testing.T) {
+ rand.Seed(randSeed())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -86,6 +90,7 @@
})
t.Run("multiple invokers", func(t *testing.T) {
+ rand.Seed(randSeed())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -117,6 +122,7 @@
})
t.Run("metrics i not found", func(t *testing.T) {
+ rand.Seed(randSeed())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -144,6 +150,7 @@
})
t.Run("metrics j not found", func(t *testing.T) {
+ rand.Seed(randSeed())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go
index 037fa55..a9f9ce5 100644
--- a/cluster/router/chain/chain.go
+++ b/cluster/router/chain/chain.go
@@ -52,7 +52,19 @@
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
- finalInvokers := c.invokers
+ finalInvokers := make([]protocol.Invoker, 0, len(c.invokers))
+ // multiple invoker may include different methods, find correct invoker otherwise
+ // will return the invoker without methods
+ for _, invoker := range c.invokers {
+ if invoker.GetURL().ServiceKey() == url.ServiceKey() {
+ finalInvokers = append(finalInvokers, invoker)
+ }
+ }
+
+ if len(finalInvokers) == 0 {
+ finalInvokers = c.invokers
+ }
+
for _, r := range c.copyRouters() {
finalInvokers = r.Route(finalInvokers, url, invocation)
}
diff --git a/cluster/router/tag/router.go b/cluster/router/tag/router.go
index d728772..d642a79 100644
--- a/cluster/router/tag/router.go
+++ b/cluster/router/tag/router.go
@@ -94,7 +94,6 @@
return
}
p.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeAdd})
-
}
func (p *PriorityRouter) Process(event *config_center.ConfigChangeEvent) {
diff --git a/cluster/router/tag/router_test.go b/cluster/router/tag/router_test.go
index 0ca3d39..22ddba7 100644
--- a/cluster/router/tag/router_test.go
+++ b/cluster/router/tag/router_test.go
@@ -354,7 +354,8 @@
- name: tag1
addresses: [192.168.0.1:20881]
- name: tag2
- addresses: [192.168.0.2:20882]`}
+ addresses: [192.168.0.2:20882]`,
+ }
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
@@ -380,7 +381,8 @@
extension.SetDefaultConfigurator(configurator.NewMockConfigurator)
ccUrl, _ := common.NewURL("mock://127.0.0.1:1111")
mockFactory := &config_center.MockDynamicConfigurationFactory{
- Content: `xxxxxx`}
+ Content: `xxxxxx`,
+ }
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
diff --git a/common/url.go b/common/url.go
index c143cb2..75e2070 100644
--- a/common/url.go
+++ b/common/url.go
@@ -284,6 +284,11 @@
return c.GetParam(constant.GroupKey, "")
}
+// Interface get interface
+func (c *URL) Interface() string {
+ return c.GetParam(constant.InterfaceKey, "")
+}
+
// Version get group
func (c *URL) Version() string {
return c.GetParam(constant.VersionKey, "")
@@ -356,7 +361,7 @@
return buildString
}
-//GetCacheInvokerMapKey get directory cacheInvokerMap key
+// GetCacheInvokerMapKey get directory cacheInvokerMap key
func (c *URL) GetCacheInvokerMapKey() string {
urlNew, _ := NewURL(c.PrimitiveURL)
@@ -369,7 +374,7 @@
// ServiceKey gets a unique key of a service.
func (c *URL) ServiceKey() string {
- return ServiceKey(c.GetParam(constant.InterfaceKey, strings.TrimPrefix(c.Path, "/")),
+ return ServiceKey(c.GetParam(constant.InterfaceKey, strings.TrimPrefix(c.Path, constant.PathSeparator)),
c.GetParam(constant.GroupKey, ""), c.GetParam(constant.VersionKey, ""))
}
@@ -861,7 +866,7 @@
return compareURLEqualFunc
}
-//GetParamDuration get duration if param is invalid or missing will return 3s
+// GetParamDuration get duration if param is invalid or missing will return 3s
func (c *URL) GetParamDuration(s string, d string) time.Duration {
if t, err := time.ParseDuration(c.GetParam(s, d)); err == nil {
return t
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 76bb07d..6b81c5d 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -181,10 +181,13 @@
}
deadline := time.Now().Add(timeout)
- for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount.Load() > 0 {
+ offlineRequestWindowTimeout := shutdownConfig.GetOfflineRequestWindowTimeout()
+ for time.Now().Before(deadline) &&
+ (shutdownConfig.ProviderActiveCount.Load() > 0 || time.Now().Before(shutdownConfig.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout))) {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
- logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount.Load())
+ logger.Infof("waiting for provider active invocation count = %d, provider last received request time: %v",
+ shutdownConfig.ProviderActiveCount.Load(), shutdownConfig.ProviderLastReceivedRequestTime.Load())
}
}
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
index 93ea9d3..d0779a8 100644
--- a/config/graceful_shutdown_config.go
+++ b/config/graceful_shutdown_config.go
@@ -34,9 +34,10 @@
)
const (
- defaultTimeout = 60 * time.Second
- defaultStepTimeout = 3 * time.Second
- defaultConsumerUpdateWaitTime = 3 * time.Second
+ defaultTimeout = 60 * time.Second
+ defaultStepTimeout = 3 * time.Second
+ defaultConsumerUpdateWaitTime = 3 * time.Second
+ defaultOfflineRequestWindowTimeout = 3 * time.Second
)
// ShutdownConfig is used as configuration for graceful shutdown
@@ -66,12 +67,16 @@
RejectRequestHandler string `yaml:"reject-handler" json:"reject-handler,omitempty" property:"reject_handler"`
// internal listen kill signal,the default is true.
InternalSignal bool `default:"true" yaml:"internal-signal" json:"internal.signal,omitempty" property:"internal.signal"`
-
+ // offline request window length
+ OfflineRequestWindowTimeout string `yaml:"offline-request-window-timeout" json:"offlineRequestWindowTimeout,omitempty" property:"offlineRequestWindowTimeout"`
// true -> new request will be rejected.
RejectRequest atomic.Bool
// active invocation
ConsumerActiveCount atomic.Int32
ProviderActiveCount atomic.Int32
+
+ // provider last received request timestamp
+ ProviderLastReceivedRequestTime atomic.Time
}
// Prefix dubbo.shutdown
@@ -99,6 +104,16 @@
return result
}
+func (config *ShutdownConfig) GetOfflineRequestWindowTimeout() time.Duration {
+ result, err := time.ParseDuration(config.OfflineRequestWindowTimeout)
+ if err != nil {
+ logger.Errorf("The OfflineRequestWindowTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v",
+ config.OfflineRequestWindowTimeout, defaultOfflineRequestWindowTimeout.String(), err)
+ return defaultOfflineRequestWindowTimeout
+ }
+ return result
+}
+
func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration {
result, err := time.ParseDuration(config.ConsumerUpdateWaitTime)
if err != nil {
@@ -150,3 +165,8 @@
defaults.Set(scb)
return scb.shutdownConfig
}
+
+func (scb *ShutdownConfigBuilder) SetOfflineRequestWindowTimeout(offlineRequestWindowTimeout string) *ShutdownConfigBuilder {
+ scb.shutdownConfig.OfflineRequestWindowTimeout = offlineRequestWindowTimeout
+ return scb
+}
diff --git a/config/graceful_shutdown_config_test.go b/config/graceful_shutdown_config_test.go
index 3d4e8fa..c76098b 100644
--- a/config/graceful_shutdown_config_test.go
+++ b/config/graceful_shutdown_config_test.go
@@ -35,26 +35,37 @@
assert.False(t, config.RejectRequest.Load())
config = ShutdownConfig{
- Timeout: "60s",
- StepTimeout: "10s",
+ Timeout: "60s",
+ StepTimeout: "10s",
+ OfflineRequestWindowTimeout: "30s",
}
assert.Equal(t, 60*time.Second, config.GetTimeout())
assert.Equal(t, 10*time.Second, config.GetStepTimeout())
-
+ assert.Equal(t, 30*time.Second, config.GetOfflineRequestWindowTimeout())
config = ShutdownConfig{
- Timeout: "34ms",
- StepTimeout: "79ms",
+ Timeout: "34ms",
+ StepTimeout: "79ms",
+ OfflineRequestWindowTimeout: "13ms",
}
assert.Equal(t, 34*time.Millisecond, config.GetTimeout())
assert.Equal(t, 79*time.Millisecond, config.GetStepTimeout())
+ assert.Equal(t, 13*time.Millisecond, config.GetOfflineRequestWindowTimeout())
+
+ // test default
+ config = ShutdownConfig{}
+
+ assert.Equal(t, defaultTimeout, config.GetTimeout())
+ assert.Equal(t, defaultStepTimeout, config.GetStepTimeout())
+ assert.Equal(t, defaultOfflineRequestWindowTimeout, config.GetOfflineRequestWindowTimeout())
}
func TestNewShutDownConfigBuilder(t *testing.T) {
config := NewShutDownConfigBuilder().
SetTimeout("10s").
SetStepTimeout("15s").
+ SetOfflineRequestWindowTimeout("13s").
SetRejectRequestHandler("handler").
SetRejectRequest(true).
SetInternalSignal(true).
@@ -68,6 +79,8 @@
stepTimeout := config.GetStepTimeout()
assert.Equal(t, stepTimeout, 15*time.Second)
+ offlineRequestWindowTimeout := config.GetOfflineRequestWindowTimeout()
+ assert.Equal(t, offlineRequestWindowTimeout, 13*time.Second)
err := config.Init()
assert.NoError(t, err)
diff --git a/config/service_config.go b/config/service_config.go
index b0f75cf..fe8ba4b 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -138,7 +138,7 @@
s.ProtocolIDs = rc.Provider.ProtocolIDs
}
if len(s.ProtocolIDs) <= 0 {
- for k, _ := range rc.Protocols {
+ for k := range rc.Protocols {
s.ProtocolIDs = append(s.ProtocolIDs, k)
}
}
@@ -400,7 +400,7 @@
s.exportersLock.Lock()
defer s.exportersLock.Unlock()
for _, exporter := range s.exporters {
- exporter.Unexport()
+ exporter.UnExport()
}
s.exporters = nil
}()
diff --git a/filter/graceful_shutdown/provider_filter.go b/filter/graceful_shutdown/provider_filter.go
index a883d05..b9ad3a9 100644
--- a/filter/graceful_shutdown/provider_filter.go
+++ b/filter/graceful_shutdown/provider_filter.go
@@ -20,6 +20,7 @@
import (
"context"
"sync"
+ "time"
)
import (
@@ -75,6 +76,7 @@
}
}
f.shutdownConfig.ProviderActiveCount.Inc()
+ f.shutdownConfig.ProviderLastReceivedRequestTime.Store(time.Now())
return invoker.Invoke(ctx, invocation)
}
diff --git a/filter/sentinel/filter.go b/filter/sentinel/filter.go
index fdedf8d..009e77d 100644
--- a/filter/sentinel/filter.go
+++ b/filter/sentinel/filter.go
@@ -57,7 +57,7 @@
}
func (d DubboLoggerWrapper) Debug(msg string, keysAndValues ...interface{}) {
- d.Logger.Debug(logging.AssembleMsg(logging.GlobalCallerDepth, "DEBUG", msg, nil, keysAndValues))
+ d.Logger.Debug(logging.AssembleMsg(logging.GlobalCallerDepth, "DEBUG", msg, nil, keysAndValues...))
}
func (d DubboLoggerWrapper) DebugEnabled() bool {
@@ -65,7 +65,7 @@
}
func (d DubboLoggerWrapper) Info(msg string, keysAndValues ...interface{}) {
- d.Logger.Info(logging.AssembleMsg(logging.GlobalCallerDepth, "INFO", msg, nil, keysAndValues))
+ d.Logger.Info(logging.AssembleMsg(logging.GlobalCallerDepth, "INFO", msg, nil, keysAndValues...))
}
func (d DubboLoggerWrapper) InfoEnabled() bool {
@@ -73,7 +73,7 @@
}
func (d DubboLoggerWrapper) Warn(msg string, keysAndValues ...interface{}) {
- d.Logger.Warn(logging.AssembleMsg(logging.GlobalCallerDepth, "WARN", msg, nil, keysAndValues))
+ d.Logger.Warn(logging.AssembleMsg(logging.GlobalCallerDepth, "WARN", msg, nil, keysAndValues...))
}
func (d DubboLoggerWrapper) WarnEnabled() bool {
@@ -81,7 +81,7 @@
}
func (d DubboLoggerWrapper) Error(err error, msg string, keysAndValues ...interface{}) {
- d.Logger.Warn(logging.AssembleMsg(logging.GlobalCallerDepth, "ERROR", msg, err, keysAndValues))
+ d.Logger.Warn(logging.AssembleMsg(logging.GlobalCallerDepth, "ERROR", msg, err, keysAndValues...))
}
func (d DubboLoggerWrapper) ErrorEnabled() bool {
diff --git a/filter/token/filter.go b/filter/token/filter.go
index 2f6fa03..a84e9de 100644
--- a/filter/token/filter.go
+++ b/filter/token/filter.go
@@ -44,6 +44,10 @@
extension.SetFilter(constant.TokenFilterKey, newTokenFilter)
}
+const (
+ InValidTokenFormat = "[Token Filter]Invalid token! Forbid invoke remote service %v with method %s"
+)
+
// tokenFilter will verify if the token is valid
type tokenFilter struct{}
@@ -60,13 +64,31 @@
func (f *tokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invokerTkn := invoker.GetURL().GetParam(constant.TokenKey, "")
if len(invokerTkn) > 0 {
- attachs := invocation.Attachments()
- remoteTkn, exist := attachs[constant.TokenKey]
- if exist && remoteTkn != nil && strings.EqualFold(invokerTkn, remoteTkn.(string)) {
+ attas := invocation.Attachments()
+ var remoteTkn string
+ remoteTknIface, exist := attas[constant.TokenKey]
+ if !exist || remoteTknIface == nil {
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+ switch remoteTknIface.(type) {
+ case string:
+ // deal with dubbo protocol
+ remoteTkn = remoteTknIface.(string)
+ case []string:
+ // deal with triple protocol
+ remoteTkns := remoteTknIface.([]string)
+ if len(remoteTkns) != 1 {
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+ remoteTkn = remoteTkns[0]
+ default:
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+
+ if strings.EqualFold(invokerTkn, remoteTkn) {
return invoker.Invoke(ctx, invocation)
}
- return &protocol.RPCResult{Err: perrors.Errorf("Invalid token! Forbid invoke remote service %v method %s ",
- invoker, invocation.MethodName())}
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
}
return invoker.Invoke(ctx, invocation)
diff --git a/go.mod b/go.mod
index 0d5f284..ceb723c 100644
--- a/go.mod
+++ b/go.mod
@@ -9,13 +9,13 @@
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v1.0.4
github.com/apache/dubbo-getty v1.4.8
- github.com/apache/dubbo-go-hessian2 v1.11.0
+ github.com/apache/dubbo-go-hessian2 v1.11.1
github.com/cespare/xxhash/v2 v2.1.2
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
github.com/creasty/defaults v1.5.2
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
- github.com/dubbogo/gost v1.12.5
+ github.com/dubbogo/gost v1.12.6-0.20220719055648-01d5bc07b111
github.com/dubbogo/grpc-go v1.42.9
github.com/dubbogo/triple v1.1.8
github.com/emicklei/go-restful/v3 v3.8.0
@@ -29,7 +29,7 @@
github.com/google/go-cmp v0.5.8
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
- github.com/hashicorp/vault/sdk v0.5.2
+ github.com/hashicorp/vault/sdk v0.5.3
github.com/jinzhu/copier v0.3.5
github.com/knadh/koanf v1.4.2
github.com/magiconair/properties v1.8.6
@@ -45,12 +45,12 @@
github.com/stretchr/testify v1.8.0
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
- go.opentelemetry.io/otel v1.7.0
- go.opentelemetry.io/otel/trace v1.7.0
+ go.opentelemetry.io/otel v1.8.0
+ go.opentelemetry.io/otel/trace v1.8.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
google.golang.org/genproto v0.0.0-20211104193956-4c6863e31247
- google.golang.org/grpc v1.47.0
- google.golang.org/protobuf v1.28.0
+ google.golang.org/grpc v1.48.0
+ google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
)
diff --git a/go.sum b/go.sum
index c755702..874cd6b 100644
--- a/go.sum
+++ b/go.sum
@@ -70,8 +70,8 @@
github.com/apache/dubbo-getty v1.4.8/go.mod h1:cPJlbcHUTNTpiboMQjMHhE9XBni11LiBiG8FdrDuVzk=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.9.3/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
-github.com/apache/dubbo-go-hessian2 v1.11.0 h1:VTdT6NStuEqNmyT3AdSN2DLDBqhXvAAyAAAoh9hLavk=
-github.com/apache/dubbo-go-hessian2 v1.11.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
+github.com/apache/dubbo-go-hessian2 v1.11.1 h1:03xs4QCAZHY/gHCOWgOmIUW6Yc842FCLz4R0hxCzPr8=
+github.com/apache/dubbo-go-hessian2 v1.11.1/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -173,8 +173,8 @@
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.23/go.mod h1:PhJ8+qZJx+Txjx1KthNPuVkCvUca0jRLgKWj/noGgeI=
-github.com/dubbogo/gost v1.12.5 h1:vpqQOAh5d1w/Qc+cvJmyiWRkwHFQPmZfgGwCBjuesBY=
-github.com/dubbogo/gost v1.12.5/go.mod h1:f0bcP1xpBUdDgiNjNYKF6F3qlA+RFKs0k980FhoEn/g=
+github.com/dubbogo/gost v1.12.6-0.20220719055648-01d5bc07b111 h1:ydfcIHE0slI/R1plWVKaJWpgwo60EuZ5MQyQ3ZiOITQ=
+github.com/dubbogo/gost v1.12.6-0.20220719055648-01d5bc07b111/go.mod h1:f0bcP1xpBUdDgiNjNYKF6F3qlA+RFKs0k980FhoEn/g=
github.com/dubbogo/grpc-go v1.42.9 h1:nTuglkH9rTJzQfardU4b0OJ0Imd2169dMNLBTNhTdlc=
github.com/dubbogo/grpc-go v1.42.9/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
@@ -434,8 +434,8 @@
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q=
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
-github.com/hashicorp/vault/sdk v0.5.2 h1:Lub3cuwra6ifGmVYqX0x2pehWmUZl3zTElIjnyvBe2M=
-github.com/hashicorp/vault/sdk v0.5.2/go.mod h1:DoGraE9kKGNcVgPmTuX357Fm6WAx1Okvde8Vp3dPDoU=
+github.com/hashicorp/vault/sdk v0.5.3 h1:PWY8sq/9pRrK9vUIy75qCH2Jd8oeENAgkaa/qbhzFrs=
+github.com/hashicorp/vault/sdk v0.5.3/go.mod h1:DoGraE9kKGNcVgPmTuX357Fm6WAx1Okvde8Vp3dPDoU=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@@ -796,10 +796,10 @@
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
-go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
-go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
-go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
-go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
+go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg=
+go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM=
+go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY=
+go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@@ -1194,8 +1194,8 @@
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
-google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8=
-google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
+google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -1209,8 +1209,8 @@
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 231a2c8..9777c74 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -252,11 +252,17 @@
if len(labelMap) == 0 {
// gauge
if val, exist := reporter.userGauge.Load(gaugeName); !exist {
- newGauge := newGauge(gaugeName, reporter.namespace)
- _ = prom.DefaultRegisterer.Register(newGauge)
+ gauge := newGauge(gaugeName, reporter.namespace)
+ err := prom.DefaultRegisterer.Register(gauge)
+ if err == nil {
+ reporter.userGauge.Store(gaugeName, gauge)
+ gauge.Set(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A gauge for that metric has been registered before.
+ // Use the old gauge from now on.
+ are.ExistingCollector.(prometheus.Gauge).Set(toSetValue)
+ }
- reporter.userGauge.Store(gaugeName, newGauge)
- newGauge.Set(toSetValue)
} else {
val.(prometheus.Gauge).Set(toSetValue)
}
@@ -269,10 +275,16 @@
for k, _ := range labelMap {
keyList = append(keyList, k)
}
- newGaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
- _ = prom.DefaultRegisterer.Register(newGaugeVec)
- reporter.userGaugeVec.Store(gaugeName, newGaugeVec)
- newGaugeVec.With(labelMap).Set(toSetValue)
+ gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
+ err := prom.DefaultRegisterer.Register(gaugeVec)
+ if err == nil {
+ reporter.userGaugeVec.Store(gaugeName, gaugeVec)
+ gaugeVec.With(labelMap).Set(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A gauge for that metric has been registered before.
+ // Use the old gauge from now on.
+ are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
+ }
} else {
val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
}
@@ -284,10 +296,16 @@
if len(labelMap) == 0 {
// counter
if val, exist := reporter.userCounter.Load(counterName); !exist {
- newCounter := newCounter(counterName, reporter.namespace)
- _ = prom.DefaultRegisterer.Register(newCounter)
- reporter.userCounter.Store(counterName, newCounter)
- newCounter.Inc()
+ counter := newCounter(counterName, reporter.namespace)
+ err := prom.DefaultRegisterer.Register(counter)
+ if err == nil {
+ reporter.userCounter.Store(counterName, counter)
+ counter.Inc()
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A counter for that metric has been registered before.
+ // Use the old counter from now on.
+ are.ExistingCollector.(prometheus.Counter).Inc()
+ }
} else {
val.(prometheus.Counter).Inc()
}
@@ -300,10 +318,16 @@
for k, _ := range labelMap {
keyList = append(keyList, k)
}
- newCounterVec := newCounterVec(counterName, reporter.namespace, keyList)
- _ = prom.DefaultRegisterer.Register(newCounterVec)
- reporter.userCounterVec.Store(counterName, newCounterVec)
- newCounterVec.With(labelMap).Inc()
+ counterVec := newCounterVec(counterName, reporter.namespace, keyList)
+ err := prom.DefaultRegisterer.Register(counterVec)
+ if err == nil {
+ reporter.userCounterVec.Store(counterName, counterVec)
+ counterVec.With(labelMap).Inc()
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A counter for that metric has been registered before.
+ // Use the old counter from now on.
+ are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc()
+ }
} else {
val.(*prometheus.CounterVec).With(labelMap).Inc()
}
@@ -315,10 +339,16 @@
if len(labelMap) == 0 {
// summary
if val, exist := reporter.userSummary.Load(summaryName); !exist {
- newSummary := newSummary(summaryName, reporter.namespace)
- _ = prom.DefaultRegisterer.Register(newSummary)
- reporter.userSummary.Store(summaryName, newSummary)
- newSummary.Observe(toSetValue)
+ summary := newSummary(summaryName, reporter.namespace)
+ err := prom.DefaultRegisterer.Register(summary)
+ if err == nil {
+ reporter.userSummary.Store(summaryName, summary)
+ summary.Observe(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A summary for that metric has been registered before.
+ // Use the old summary from now on.
+ are.ExistingCollector.(prometheus.Summary).Observe(toSetValue)
+ }
} else {
val.(prometheus.Summary).Observe(toSetValue)
}
@@ -331,10 +361,16 @@
for k, _ := range labelMap {
keyList = append(keyList, k)
}
- newSummaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge)
- _ = prom.DefaultRegisterer.Register(newSummaryVec)
- reporter.userSummaryVec.Store(summaryName, newSummaryVec)
- newSummaryVec.With(labelMap).Observe(toSetValue)
+ summaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge)
+ err := prom.DefaultRegisterer.Register(summaryVec)
+ if err == nil {
+ reporter.userSummaryVec.Store(summaryName, summaryVec)
+ summaryVec.With(labelMap).Observe(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ // A summary for that metric has been registered before.
+ // Use the old summary from now on.
+ are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
+ }
} else {
val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
}
diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go
index 1f40084..ad50f30 100644
--- a/protocol/dubbo/dubbo_exporter.go
+++ b/protocol/dubbo/dubbo_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport unexport dubbo service exporter.
-func (de *DubboExporter) Unexport() {
+func (de *DubboExporter) UnExport() {
interfaceName := de.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- de.BaseExporter.Unexport()
+ de.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, DUBBO, de.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[DubboExporter.Unexport] error: %v", err)
+ logger.Errorf("[DubboExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index df16807..6f72753 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -36,7 +36,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
- invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -84,7 +84,7 @@
}
// Invoke call remoting.
-func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+func (di *DubboInvoker) Invoke(ctx context.Context, ivc protocol.Invocation) protocol.Result {
var (
err error
result protocol.RPCResult
@@ -114,7 +114,7 @@
return &result
}
- inv := invocation.(*invocation_impl.RPCInvocation)
+ inv := ivc.(*invocation.RPCInvocation)
// init param
inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
for _, k := range attachmentKey {
@@ -142,15 +142,15 @@
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
- result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
+ result.Err = di.client.AsyncRequest(&ivc, url, timeout, callBack, rest)
} else {
- result.Err = di.client.Send(&invocation, url, timeout)
+ result.Err = di.client.Send(&ivc, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = protocol.ErrNoReply
} else {
- result.Err = di.client.Request(&invocation, url, timeout, rest)
+ result.Err = di.client.Request(&ivc, url, timeout, rest)
}
}
if result.Err == nil {
@@ -162,21 +162,21 @@
}
// get timeout including methodConfig
-func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration {
- methodName := invocation.MethodName()
+func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation) time.Duration {
+ methodName := ivc.MethodName()
if di.GetURL().GetParamBool(constant.GenericKey, false) {
- methodName = invocation.Arguments()[0].(string)
+ methodName = ivc.Arguments()[0].(string)
}
timeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, constant.TimeoutKey}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
- invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
+ ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
- invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
+ ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
@@ -207,11 +207,11 @@
// Finally, I made the decision that I don't provide a general way to transfer the whole context
// because it could be misused. If the context contains to many key-value pairs, the performance will be much lower.
-func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) {
+func (di *DubboInvoker) appendCtx(ctx context.Context, ivc *invocation.RPCInvocation) {
// inject opentracing ctx
currentSpan := opentracing.SpanFromContext(ctx)
if currentSpan != nil {
- err := injectTraceCtx(currentSpan, inv)
+ err := injectTraceCtx(currentSpan, ivc)
if err != nil {
logger.Errorf("Could not inject the span context into attachments: %v", err)
}
diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go
index 3ccbcd1..a7988fa 100644
--- a/protocol/dubbo/dubbo_protocol_test.go
+++ b/protocol/dubbo/dubbo_protocol_test.go
@@ -100,10 +100,10 @@
eq2 := exporter2.GetInvoker().GetURL().URLEqual(url2)
assert.True(t, eq2)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.True(t, ok)
- exporter2.Unexport()
+ exporter2.UnExport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.False(t, ok)
diff --git a/protocol/dubbo3/dubbo3_exporter.go b/protocol/dubbo3/dubbo3_exporter.go
index 3b997b0..13214de 100644
--- a/protocol/dubbo3/dubbo3_exporter.go
+++ b/protocol/dubbo3/dubbo3_exporter.go
@@ -49,13 +49,13 @@
}
// Unexport unexport dubbo3 service exporter.
-func (de *DubboExporter) Unexport() {
+func (de *DubboExporter) UnExport() {
url := de.GetInvoker().GetURL()
interfaceName := url.GetParam(constant.InterfaceKey, "")
- de.BaseExporter.Unexport()
+ de.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey())
if err != nil {
- logger.Errorf("[DubboExporter.Unexport] error: %v", err)
+ logger.Errorf("[DubboExporter.UnExport] error: %v", err)
}
de.serviceMap.Delete(interfaceName)
}
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index 9e8801e..11cf801 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -73,8 +73,8 @@
interfaceKey := url.GetParam(constant.InterfaceKey, "")
consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey)
- dubboSerializaerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
- triCodecType := tripleConstant.CodecType(dubboSerializaerType)
+ dubboSerializerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
+ triCodecType := tripleConstant.CodecType(dubboSerializerType)
// new triple client
opts := []triConfig.OptionFunction{
triConfig.WithClientTimeout(uint32(timeout.Seconds())),
@@ -181,6 +181,7 @@
// append interface id to ctx
gRPCMD := make(metadata.MD, 0)
+ // triple will convert attachment value to []string
for k, v := range invocation.Attachments() {
if str, ok := v.(string); ok {
gRPCMD.Set(k, str)
@@ -190,7 +191,7 @@
gRPCMD.Set(k, str...)
continue
}
- logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+ logger.Warnf("[Triple Protocol]Triple attachment value with key = %s is invalid, which should be string or []string", k)
}
ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
diff --git a/protocol/dubbo3/dubbo3_protocol_test.go b/protocol/dubbo3/dubbo3_protocol_test.go
index b0baa35..34a5c27 100644
--- a/protocol/dubbo3/dubbo3_protocol_test.go
+++ b/protocol/dubbo3/dubbo3_protocol_test.go
@@ -57,10 +57,10 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.False(t, ok)
@@ -144,7 +144,7 @@
func subTest(t *testing.T, val, paramsInterfaces interface{}) {
list := paramsInterfaces.([]interface{})
- for k, _ := range list {
+ for k := range list {
err := hessian.ReflectResponse(val, list[k])
assert.Nil(t, err)
}
diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go
index 464ce39..c145eaf 100644
--- a/protocol/grpc/grpc_exporter.go
+++ b/protocol/grpc/grpc_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport and unregister gRPC service from registry and memory.
-func (gg *GrpcExporter) Unexport() {
+func (gg *GrpcExporter) UnExport() {
interfaceName := gg.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- gg.BaseExporter.Unexport()
+ gg.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, GRPC, gg.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[GrpcExporter.Unexport] error: %v", err)
+ logger.Errorf("[GrpcExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/grpc/grpc_protocol_test.go b/protocol/grpc/grpc_protocol_test.go
index 9790843..4993780 100644
--- a/protocol/grpc/grpc_protocol_test.go
+++ b/protocol/grpc/grpc_protocol_test.go
@@ -83,10 +83,10 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey())
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey())
assert.False(t, ok)
diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go
index 6cdfb46..71d6b7d 100644
--- a/protocol/jsonrpc/jsonrpc_exporter.go
+++ b/protocol/jsonrpc/jsonrpc_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport exported JSON RPC service.
-func (je *JsonrpcExporter) Unexport() {
+func (je *JsonrpcExporter) UnExport() {
interfaceName := je.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- je.BaseExporter.Unexport()
+ je.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, JSONRPC, je.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[JsonrpcExporter.Unexport] error: %v", err)
+ logger.Errorf("[JsonrpcExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go
index feb070a..6189189 100644
--- a/protocol/jsonrpc/jsonrpc_protocol_test.go
+++ b/protocol/jsonrpc/jsonrpc_protocol_test.go
@@ -48,11 +48,11 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
fmt.Println(url.Path)
_, ok := proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
assert.False(t, ok)
diff --git a/protocol/protocol.go b/protocol/protocol.go
index abd83c9..a71fc85 100644
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -29,7 +29,7 @@
"dubbo.apache.org/dubbo-go/v3/common"
)
-// Protocol is the interface that wraps the basic Export、 Refer and Destroy method.
+// Protocol is the interface that wraps the basic Export, Refer and Destroy method.
//
// Export method is to export service for remote invocation
//
@@ -42,14 +42,14 @@
Destroy()
}
-// Exporter is the interface that wraps the basic GetInvoker method and Destroy Unexport.
+// Exporter is the interface that wraps the basic GetInvoker method and Destroy UnExport.
//
// GetInvoker method is to get invoker.
//
-// Unexport method is to unexport a exported service
+// UnExport is to un export an exported service
type Exporter interface {
GetInvoker() Invoker
- Unexport()
+ UnExport()
}
// BaseProtocol is default protocol implement.
@@ -105,10 +105,10 @@
}
bp.invokers = []Invoker{}
- // unexport exporters
+ // un export exporters
bp.exporterMap.Range(func(key, exporter interface{}) bool {
if exporter != nil {
- exporter.(Exporter).Unexport()
+ exporter.(Exporter).UnExport()
} else {
bp.exporterMap.Delete(key)
}
@@ -137,8 +137,8 @@
return de.invoker
}
-// Unexport exported service.
-func (de *BaseExporter) Unexport() {
+// UnExport un export service.
+func (de *BaseExporter) UnExport() {
logger.Infof("Exporter unexport.")
de.invoker.Destroy()
de.exporterMap.Delete(de.key)
diff --git a/protocol/rest/rest_exporter.go b/protocol/rest/rest_exporter.go
index 9bc27da..91cf15d 100644
--- a/protocol/rest/rest_exporter.go
+++ b/protocol/rest/rest_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport unexport the RestExporter
-func (re *RestExporter) Unexport() {
+func (re *RestExporter) UnExport() {
interfaceName := re.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- re.BaseExporter.Unexport()
+ re.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, REST, re.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[RestExporter.Unexport] error: %v", err)
+ logger.Errorf("[RestExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/rest/rest_protocol_test.go b/protocol/rest/rest_protocol_test.go
index d216217..1bd6762 100644
--- a/protocol/rest/rest_protocol_test.go
+++ b/protocol/rest/rest_protocol_test.go
@@ -107,11 +107,11 @@
// // make sure url
// eq := exporter.GetInvoker().GetURL().URLEqual(url)
// assert.True(t, eq)
-// // make sure exporterMap after 'Unexport'
+// // make sure exporterMap after 'UnExport'
// fmt.Println(url.Path)
// _, ok := proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
// assert.True(t, ok)
-// exporter.Unexport()
+// exporter.UnExport()
// _, ok = proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
// assert.False(t, ok)
//
diff --git a/registry/polaris/core.go b/registry/polaris/core.go
index 8c82b7b..e87fac7 100644
--- a/registry/polaris/core.go
+++ b/registry/polaris/core.go
@@ -57,9 +57,8 @@
// AddSubscriber add subscriber into watcher's subscribers
func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber func(remoting.EventType, []model.Instance)) {
- watcher.lazyRun()
-
watcher.lock.Lock()
+ watcher.lazyRun()
defer watcher.lock.Unlock()
watcher.subscribers = append(watcher.subscribers, subscriber)
@@ -74,48 +73,50 @@
// startWatch start run work to watch target service by polaris
func (watcher *PolarisServiceWatcher) startWatch() {
-
for {
resp, err := watcher.consumer.WatchService(watcher.subscribeParam)
if err != nil {
time.Sleep(time.Duration(500 * time.Millisecond))
continue
}
-
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: resp.GetAllInstancesResp.Instances,
ConfigType: remoting.EventTypeAdd,
})
- select {
- case event := <-resp.EventChannel:
- eType := event.GetSubScribeEventType()
- if eType == api.EventInstance {
- insEvent := event.(*model.InstanceEvent)
- if insEvent.AddEvent != nil {
- watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value: insEvent.AddEvent.Instances,
- ConfigType: remoting.EventTypeAdd,
- })
- }
- if insEvent.UpdateEvent != nil {
- instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
- for i := range insEvent.UpdateEvent.UpdateList {
- instances[i] = insEvent.UpdateEvent.UpdateList[i].After
+ for {
+ select {
+ case event := <-resp.EventChannel:
+ eType := event.GetSubScribeEventType()
+ if eType == api.EventInstance {
+ insEvent := event.(*model.InstanceEvent)
+
+ if insEvent.AddEvent != nil {
+ watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: insEvent.AddEvent.Instances,
+ ConfigType: remoting.EventTypeAdd,
+ })
}
- watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value: instances,
- ConfigType: remoting.EventTypeUpdate,
- })
- }
- if insEvent.DeleteEvent != nil {
- watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value: insEvent.DeleteEvent.Instances,
- ConfigType: remoting.EventTypeDel,
- })
+ if insEvent.UpdateEvent != nil {
+ instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
+ for i := range insEvent.UpdateEvent.UpdateList {
+ instances[i] = insEvent.UpdateEvent.UpdateList[i].After
+ }
+ watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: instances,
+ ConfigType: remoting.EventTypeUpdate,
+ })
+ }
+ if insEvent.DeleteEvent != nil {
+ watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: insEvent.DeleteEvent.Instances,
+ ConfigType: remoting.EventTypeDel,
+ })
+ }
}
}
}
+
}
}
diff --git a/registry/polaris/core_test.go b/registry/polaris/core_test.go
index ba71c2f..0e806a3 100644
--- a/registry/polaris/core_test.go
+++ b/registry/polaris/core_test.go
@@ -1,14 +1,21 @@
package polaris
import (
- "dubbo.apache.org/dubbo-go/v3/remoting"
- "github.com/polarismesh/polaris-go/api"
- "github.com/polarismesh/polaris-go/pkg/model"
- "github.com/stretchr/testify/assert"
"sync"
"testing"
)
+import (
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/model"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
func TestPolarisServiceWatcher_AddSubscriber(t *testing.T) {
type fields struct {
consumer api.ConsumerAPI
diff --git a/registry/polaris/registry_test.go b/registry/polaris/registry_test.go
index 17a5988..45068b0 100644
--- a/registry/polaris/registry_test.go
+++ b/registry/polaris/registry_test.go
@@ -1,13 +1,19 @@
package polaris
import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "github.com/polarismesh/polaris-go/api"
"reflect"
"sync"
"testing"
)
+import (
+ "github.com/polarismesh/polaris-go/api"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
func Test_createDeregisterParam(t *testing.T) {
type args struct {
url *common.URL
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 50f60e3..1b0d390 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -241,9 +241,9 @@
key := getCacheKey(invoker)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
wrappedNewInvoker := newInvokerDelegate(invoker, newUrl)
- oldExporter.(protocol.Exporter).Unexport()
+ oldExporter.(protocol.Exporter).UnExport()
proto.bounds.Delete(key)
- // oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible
+ // oldExporter UnExport function unRegister rpcService from the serviceMap, so need register it again as far as possible
if err := registerServiceMap(invoker); err != nil {
logger.Error(err.Error())
}
@@ -402,7 +402,7 @@
func (proto *registryProtocol) Destroy() {
proto.bounds.Range(func(key, value interface{}) bool {
// protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so
- // the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols().
+ // the work for unexport should be finished in protocol.UnExport(), see also config.destroyProviderProtocols().
exporter := value.(*exporterChangeableWrapper)
reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker))
if err := reg.UnRegister(exporter.registerUrl); err != nil {
@@ -415,7 +415,7 @@
go func() {
select {
case <-time.After(config.GetShutDown().GetStepTimeout() + config.GetShutDown().GetConsumerUpdateWaitTime()):
- exporter.Unexport()
+ exporter.UnExport()
proto.bounds.Delete(key)
}
}()
@@ -481,8 +481,8 @@
subscribeUrl *common.URL
}
-func (e *exporterChangeableWrapper) Unexport() {
- e.exporter.Unexport()
+func (e *exporterChangeableWrapper) UnExport() {
+ e.exporter.UnExport()
}
func (e *exporterChangeableWrapper) SetRegisterUrl(registerUrl *common.URL) {
diff --git a/registry/servicediscovery/instance/random/random_service_instance_selector.go b/registry/servicediscovery/instance/random/random_service_instance_selector.go
index 599fb4d..82dbe85 100644
--- a/registry/servicediscovery/instance/random/random_service_instance_selector.go
+++ b/registry/servicediscovery/instance/random/random_service_instance_selector.go
@@ -30,6 +30,7 @@
)
func init() {
+ rand.Seed(time.Now().UnixNano())
extension.SetServiceInstanceSelector("random", NewRandomServiceInstanceSelector)
}
@@ -47,7 +48,6 @@
if len(serviceInstances) == 1 {
return serviceInstances[0]
}
- rand.Seed(time.Now().UnixNano())
index := rand.Intn(len(serviceInstances))
return serviceInstances[index]
}
diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go
index ebd4807..20e2e46 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -19,31 +19,24 @@
import (
"context"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/protocol"
- "dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/registry/event"
- "dubbo.apache.org/dubbo-go/v3/remoting/nacos"
- "fmt"
- gxset "github.com/dubbogo/gost/container/set"
- "github.com/nacos-group/nacos-sdk-go/model"
- "github.com/nacos-group/nacos-sdk-go/vo"
- perrors "github.com/pkg/errors"
"sync"
"testing"
)
import (
+ "github.com/nacos-group/nacos-sdk-go/model"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+
"github.com/stretchr/testify/assert"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
-const testName = "test"
-
func Test_newZookeeperServiceDiscovery(t *testing.T) {
url, _ := common.NewURL("dubbo://127.0.0.1:2181",
common.WithParamsValue(constant.ClientNameKey, "zk-client"))
@@ -53,92 +46,9 @@
assert.Nil(t, err)
}
-
-func TestFunction(t *testing.T) {
-
- extension.SetProtocol("mock", func() protocol.Protocol {
- return &mockProtocol{}
- })
-
- url, _ := common.NewURL("dubbo://127.0.0.1:8848")
- sd, _ := newMockNacosServiceDiscovery(url)
- defer func() {
- _ = sd.Destroy()
- }()
-
- ins := ®istry.DefaultServiceInstance{
- ID: "testID",
- ServiceName: testName,
- Host: "127.0.0.1",
- Port: 2233,
- Enable: true,
- Healthy: true,
- Metadata: nil,
- }
- ins.Metadata = map[string]string{"t1": "test12", constant.MetadataServiceURLParamsPropertyName: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
- err := sd.Register(ins)
- assert.Nil(t, err)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
- tn := &testNotify{
- wg: wg,
- t: t,
- }
- hs := gxset.NewSet()
- hs.Add(testName)
-
- sicl := event.NewServiceInstancesChangedListener(hs)
- sicl.AddListenerAndNotify(testName, tn)
- err = sd.AddListener(sicl)
- assert.NoError(t, err)
-
- ins = ®istry.DefaultServiceInstance{
- ID: "testID",
- ServiceName: testName,
- Host: "127.0.0.1",
- Port: 2233,
- Enable: true,
- Healthy: true,
- Metadata: nil,
- }
- ins.Metadata = map[string]string{"t1": "test12", constant.MetadataServiceURLParamsPropertyName: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
- err = sd.Update(ins)
- assert.NoError(t, err)
- err = sd.Unregister(ins)
- assert.Nil(t, err)
-}
-
-func newMockNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
- discoveryURL := common.NewURLWithOptions(
- common.WithParams(url.GetParams()),
- common.WithParamsValue(constant.TimeoutKey, url.GetParam(constant.RegistryTimeoutKey, constant.DefaultRegTimeout)),
- common.WithParamsValue(constant.NacosGroupKey, url.GetParam(constant.RegistryGroupKey, defaultGroup)),
- common.WithParamsValue(constant.NacosUsername, url.Username),
- common.WithParamsValue(constant.NacosPassword, url.Password),
- common.WithParamsValue(constant.ClientNameKey, "nacos-client"),
- common.WithParamsValue(constant.NacosNamespaceID, url.GetParam(constant.RegistryNamespaceKey, "")))
- discoveryURL.Location = url.Location
- discoveryURL.Username = url.Username
- discoveryURL.Password = url.Password
- client, err := nacos.NewNacosClientByURL(discoveryURL)
- mc := mockClient{}
- client.SetClient(mc)
- if err != nil {
- return nil, perrors.WithMessage(err, "create nacos namingClient failed.")
- }
-
- descriptor := fmt.Sprintf("zk-service-discovery[%s]", discoveryURL.Location)
-
- group := url.GetParam(constant.RegistryGroupKey, defaultGroup)
- newInstance := &nacosServiceDiscovery{
- group: group,
- namingClient: client,
- descriptor: descriptor,
- registryInstances: []registry.ServiceInstance{},
- instanceListenerMap: make(map[string]*gxset.HashSet),
- }
- return newInstance, nil
+func Test_zookeeperServiceDiscovery_DataChange(t *testing.T) {
+ serviceDiscovery := &zookeeperServiceDiscovery{}
+ assert.Equal(t, registry.DefaultPageSize, serviceDiscovery.GetDefaultPageSize())
}
type testNotify struct {
@@ -147,7 +57,7 @@
}
func (tn *testNotify) Notify(e *registry.ServiceEvent) {
- assert.Equal(tn.t, "2233", e.Service.Port)
+ assert.Equal(tn.t, "2181", e.Service.Port)
tn.wg.Done()
}
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go
index 0fdc928..d6bb25f 100644
--- a/remoting/zookeeper/curator_discovery/service_discovery.go
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -239,7 +239,7 @@
sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener)
}
-// ListenServiceInstanceEvent add a listener in a instance
+// ListenServiceInstanceEvent add a listener in an instance
func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) {
sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener)
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 2680473..d3257f9 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -272,7 +272,7 @@
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)
- // May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ // Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after: