[#1998]fix a bug where filter was not currently working properly in triple protocol and fix some spelling mistake (#1999)
diff --git a/config/service_config.go b/config/service_config.go
index b0f75cf..fe8ba4b 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -138,7 +138,7 @@
s.ProtocolIDs = rc.Provider.ProtocolIDs
}
if len(s.ProtocolIDs) <= 0 {
- for k, _ := range rc.Protocols {
+ for k := range rc.Protocols {
s.ProtocolIDs = append(s.ProtocolIDs, k)
}
}
@@ -400,7 +400,7 @@
s.exportersLock.Lock()
defer s.exportersLock.Unlock()
for _, exporter := range s.exporters {
- exporter.Unexport()
+ exporter.UnExport()
}
s.exporters = nil
}()
diff --git a/filter/token/filter.go b/filter/token/filter.go
index 2f6fa03..a84e9de 100644
--- a/filter/token/filter.go
+++ b/filter/token/filter.go
@@ -44,6 +44,10 @@
extension.SetFilter(constant.TokenFilterKey, newTokenFilter)
}
+const (
+ InValidTokenFormat = "[Token Filter]Invalid token! Forbid invoke remote service %v with method %s"
+)
+
// tokenFilter will verify if the token is valid
type tokenFilter struct{}
@@ -60,13 +64,31 @@
func (f *tokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invokerTkn := invoker.GetURL().GetParam(constant.TokenKey, "")
if len(invokerTkn) > 0 {
- attachs := invocation.Attachments()
- remoteTkn, exist := attachs[constant.TokenKey]
- if exist && remoteTkn != nil && strings.EqualFold(invokerTkn, remoteTkn.(string)) {
+ attas := invocation.Attachments()
+ var remoteTkn string
+ remoteTknIface, exist := attas[constant.TokenKey]
+ if !exist || remoteTknIface == nil {
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+ switch remoteTknIface.(type) {
+ case string:
+ // deal with dubbo protocol
+ remoteTkn = remoteTknIface.(string)
+ case []string:
+ // deal with triple protocol
+ remoteTkns := remoteTknIface.([]string)
+ if len(remoteTkns) != 1 {
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+ remoteTkn = remoteTkns[0]
+ default:
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
+ }
+
+ if strings.EqualFold(invokerTkn, remoteTkn) {
return invoker.Invoke(ctx, invocation)
}
- return &protocol.RPCResult{Err: perrors.Errorf("Invalid token! Forbid invoke remote service %v method %s ",
- invoker, invocation.MethodName())}
+ return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())}
}
return invoker.Invoke(ctx, invocation)
diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go
index 1f40084..ad50f30 100644
--- a/protocol/dubbo/dubbo_exporter.go
+++ b/protocol/dubbo/dubbo_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport unexport dubbo service exporter.
-func (de *DubboExporter) Unexport() {
+func (de *DubboExporter) UnExport() {
interfaceName := de.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- de.BaseExporter.Unexport()
+ de.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, DUBBO, de.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[DubboExporter.Unexport] error: %v", err)
+ logger.Errorf("[DubboExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index df16807..6f72753 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -36,7 +36,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
- invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -84,7 +84,7 @@
}
// Invoke call remoting.
-func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+func (di *DubboInvoker) Invoke(ctx context.Context, ivc protocol.Invocation) protocol.Result {
var (
err error
result protocol.RPCResult
@@ -114,7 +114,7 @@
return &result
}
- inv := invocation.(*invocation_impl.RPCInvocation)
+ inv := ivc.(*invocation.RPCInvocation)
// init param
inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
for _, k := range attachmentKey {
@@ -142,15 +142,15 @@
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
- result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
+ result.Err = di.client.AsyncRequest(&ivc, url, timeout, callBack, rest)
} else {
- result.Err = di.client.Send(&invocation, url, timeout)
+ result.Err = di.client.Send(&ivc, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = protocol.ErrNoReply
} else {
- result.Err = di.client.Request(&invocation, url, timeout, rest)
+ result.Err = di.client.Request(&ivc, url, timeout, rest)
}
}
if result.Err == nil {
@@ -162,21 +162,21 @@
}
// get timeout including methodConfig
-func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration {
- methodName := invocation.MethodName()
+func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation) time.Duration {
+ methodName := ivc.MethodName()
if di.GetURL().GetParamBool(constant.GenericKey, false) {
- methodName = invocation.Arguments()[0].(string)
+ methodName = ivc.Arguments()[0].(string)
}
timeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, constant.TimeoutKey}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
- invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
+ ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
- invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
+ ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
@@ -207,11 +207,11 @@
// Finally, I made the decision that I don't provide a general way to transfer the whole context
// because it could be misused. If the context contains to many key-value pairs, the performance will be much lower.
-func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) {
+func (di *DubboInvoker) appendCtx(ctx context.Context, ivc *invocation.RPCInvocation) {
// inject opentracing ctx
currentSpan := opentracing.SpanFromContext(ctx)
if currentSpan != nil {
- err := injectTraceCtx(currentSpan, inv)
+ err := injectTraceCtx(currentSpan, ivc)
if err != nil {
logger.Errorf("Could not inject the span context into attachments: %v", err)
}
diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go
index 3ccbcd1..a7988fa 100644
--- a/protocol/dubbo/dubbo_protocol_test.go
+++ b/protocol/dubbo/dubbo_protocol_test.go
@@ -100,10 +100,10 @@
eq2 := exporter2.GetInvoker().GetURL().URLEqual(url2)
assert.True(t, eq2)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.True(t, ok)
- exporter2.Unexport()
+ exporter2.UnExport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.False(t, ok)
diff --git a/protocol/dubbo3/dubbo3_exporter.go b/protocol/dubbo3/dubbo3_exporter.go
index 3b997b0..13214de 100644
--- a/protocol/dubbo3/dubbo3_exporter.go
+++ b/protocol/dubbo3/dubbo3_exporter.go
@@ -49,13 +49,13 @@
}
// Unexport unexport dubbo3 service exporter.
-func (de *DubboExporter) Unexport() {
+func (de *DubboExporter) UnExport() {
url := de.GetInvoker().GetURL()
interfaceName := url.GetParam(constant.InterfaceKey, "")
- de.BaseExporter.Unexport()
+ de.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey())
if err != nil {
- logger.Errorf("[DubboExporter.Unexport] error: %v", err)
+ logger.Errorf("[DubboExporter.UnExport] error: %v", err)
}
de.serviceMap.Delete(interfaceName)
}
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index 9e8801e..11cf801 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -73,8 +73,8 @@
interfaceKey := url.GetParam(constant.InterfaceKey, "")
consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey)
- dubboSerializaerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
- triCodecType := tripleConstant.CodecType(dubboSerializaerType)
+ dubboSerializerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
+ triCodecType := tripleConstant.CodecType(dubboSerializerType)
// new triple client
opts := []triConfig.OptionFunction{
triConfig.WithClientTimeout(uint32(timeout.Seconds())),
@@ -181,6 +181,7 @@
// append interface id to ctx
gRPCMD := make(metadata.MD, 0)
+ // triple will convert attachment value to []string
for k, v := range invocation.Attachments() {
if str, ok := v.(string); ok {
gRPCMD.Set(k, str)
@@ -190,7 +191,7 @@
gRPCMD.Set(k, str...)
continue
}
- logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+ logger.Warnf("[Triple Protocol]Triple attachment value with key = %s is invalid, which should be string or []string", k)
}
ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
diff --git a/protocol/dubbo3/dubbo3_protocol_test.go b/protocol/dubbo3/dubbo3_protocol_test.go
index b0baa35..34a5c27 100644
--- a/protocol/dubbo3/dubbo3_protocol_test.go
+++ b/protocol/dubbo3/dubbo3_protocol_test.go
@@ -57,10 +57,10 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.False(t, ok)
@@ -144,7 +144,7 @@
func subTest(t *testing.T, val, paramsInterfaces interface{}) {
list := paramsInterfaces.([]interface{})
- for k, _ := range list {
+ for k := range list {
err := hessian.ReflectResponse(val, list[k])
assert.Nil(t, err)
}
diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go
index 464ce39..c145eaf 100644
--- a/protocol/grpc/grpc_exporter.go
+++ b/protocol/grpc/grpc_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport and unregister gRPC service from registry and memory.
-func (gg *GrpcExporter) Unexport() {
+func (gg *GrpcExporter) UnExport() {
interfaceName := gg.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- gg.BaseExporter.Unexport()
+ gg.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, GRPC, gg.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[GrpcExporter.Unexport] error: %v", err)
+ logger.Errorf("[GrpcExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/grpc/grpc_protocol_test.go b/protocol/grpc/grpc_protocol_test.go
index 9790843..4993780 100644
--- a/protocol/grpc/grpc_protocol_test.go
+++ b/protocol/grpc/grpc_protocol_test.go
@@ -83,10 +83,10 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
_, ok := proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey())
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey())
assert.False(t, ok)
diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go
index 6cdfb46..71d6b7d 100644
--- a/protocol/jsonrpc/jsonrpc_exporter.go
+++ b/protocol/jsonrpc/jsonrpc_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport exported JSON RPC service.
-func (je *JsonrpcExporter) Unexport() {
+func (je *JsonrpcExporter) UnExport() {
interfaceName := je.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- je.BaseExporter.Unexport()
+ je.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, JSONRPC, je.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[JsonrpcExporter.Unexport] error: %v", err)
+ logger.Errorf("[JsonrpcExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go
index feb070a..6189189 100644
--- a/protocol/jsonrpc/jsonrpc_protocol_test.go
+++ b/protocol/jsonrpc/jsonrpc_protocol_test.go
@@ -48,11 +48,11 @@
eq := exporter.GetInvoker().GetURL().URLEqual(url)
assert.True(t, eq)
- // make sure exporterMap after 'Unexport'
+ // make sure exporterMap after 'UnExport'
fmt.Println(url.Path)
_, ok := proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
assert.True(t, ok)
- exporter.Unexport()
+ exporter.UnExport()
_, ok = proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
assert.False(t, ok)
diff --git a/protocol/protocol.go b/protocol/protocol.go
index abd83c9..a71fc85 100644
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -29,7 +29,7 @@
"dubbo.apache.org/dubbo-go/v3/common"
)
-// Protocol is the interface that wraps the basic Export、 Refer and Destroy method.
+// Protocol is the interface that wraps the basic Export, Refer and Destroy method.
//
// Export method is to export service for remote invocation
//
@@ -42,14 +42,14 @@
Destroy()
}
-// Exporter is the interface that wraps the basic GetInvoker method and Destroy Unexport.
+// Exporter is the interface that wraps the basic GetInvoker method and Destroy UnExport.
//
// GetInvoker method is to get invoker.
//
-// Unexport method is to unexport a exported service
+// UnExport is to un export an exported service
type Exporter interface {
GetInvoker() Invoker
- Unexport()
+ UnExport()
}
// BaseProtocol is default protocol implement.
@@ -105,10 +105,10 @@
}
bp.invokers = []Invoker{}
- // unexport exporters
+ // un export exporters
bp.exporterMap.Range(func(key, exporter interface{}) bool {
if exporter != nil {
- exporter.(Exporter).Unexport()
+ exporter.(Exporter).UnExport()
} else {
bp.exporterMap.Delete(key)
}
@@ -137,8 +137,8 @@
return de.invoker
}
-// Unexport exported service.
-func (de *BaseExporter) Unexport() {
+// UnExport un export service.
+func (de *BaseExporter) UnExport() {
logger.Infof("Exporter unexport.")
de.invoker.Destroy()
de.exporterMap.Delete(de.key)
diff --git a/protocol/rest/rest_exporter.go b/protocol/rest/rest_exporter.go
index 9bc27da..91cf15d 100644
--- a/protocol/rest/rest_exporter.go
+++ b/protocol/rest/rest_exporter.go
@@ -44,11 +44,11 @@
}
// Unexport unexport the RestExporter
-func (re *RestExporter) Unexport() {
+func (re *RestExporter) UnExport() {
interfaceName := re.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "")
- re.BaseExporter.Unexport()
+ re.BaseExporter.UnExport()
err := common.ServiceMap.UnRegister(interfaceName, REST, re.GetInvoker().GetURL().ServiceKey())
if err != nil {
- logger.Errorf("[RestExporter.Unexport] error: %v", err)
+ logger.Errorf("[RestExporter.UnExport] error: %v", err)
}
}
diff --git a/protocol/rest/rest_protocol_test.go b/protocol/rest/rest_protocol_test.go
index d216217..1bd6762 100644
--- a/protocol/rest/rest_protocol_test.go
+++ b/protocol/rest/rest_protocol_test.go
@@ -107,11 +107,11 @@
// // make sure url
// eq := exporter.GetInvoker().GetURL().URLEqual(url)
// assert.True(t, eq)
-// // make sure exporterMap after 'Unexport'
+// // make sure exporterMap after 'UnExport'
// fmt.Println(url.Path)
// _, ok := proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
// assert.True(t, ok)
-// exporter.Unexport()
+// exporter.UnExport()
// _, ok = proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/"))
// assert.False(t, ok)
//
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 50f60e3..1b0d390 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -241,9 +241,9 @@
key := getCacheKey(invoker)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
wrappedNewInvoker := newInvokerDelegate(invoker, newUrl)
- oldExporter.(protocol.Exporter).Unexport()
+ oldExporter.(protocol.Exporter).UnExport()
proto.bounds.Delete(key)
- // oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible
+ // oldExporter UnExport function unRegister rpcService from the serviceMap, so need register it again as far as possible
if err := registerServiceMap(invoker); err != nil {
logger.Error(err.Error())
}
@@ -402,7 +402,7 @@
func (proto *registryProtocol) Destroy() {
proto.bounds.Range(func(key, value interface{}) bool {
// protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so
- // the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols().
+ // the work for unexport should be finished in protocol.UnExport(), see also config.destroyProviderProtocols().
exporter := value.(*exporterChangeableWrapper)
reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker))
if err := reg.UnRegister(exporter.registerUrl); err != nil {
@@ -415,7 +415,7 @@
go func() {
select {
case <-time.After(config.GetShutDown().GetStepTimeout() + config.GetShutDown().GetConsumerUpdateWaitTime()):
- exporter.Unexport()
+ exporter.UnExport()
proto.bounds.Delete(key)
}
}()
@@ -481,8 +481,8 @@
subscribeUrl *common.URL
}
-func (e *exporterChangeableWrapper) Unexport() {
- e.exporter.Unexport()
+func (e *exporterChangeableWrapper) UnExport() {
+ e.exporter.UnExport()
}
func (e *exporterChangeableWrapper) SetRegisterUrl(registerUrl *common.URL) {
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go
index 0fdc928..d6bb25f 100644
--- a/remoting/zookeeper/curator_discovery/service_discovery.go
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -239,7 +239,7 @@
sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener)
}
-// ListenServiceInstanceEvent add a listener in a instance
+// ListenServiceInstanceEvent add a listener in an instance
func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) {
sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener)
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 2680473..d3257f9 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -272,7 +272,7 @@
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)
- // May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ // Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after: