| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package dubbo |
| |
| import ( |
| "context" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| |
| "github.com/opentracing/opentracing-go" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/config" |
| "dubbo.apache.org/dubbo-go/v3/protocol" |
| "dubbo.apache.org/dubbo-go/v3/protocol/invocation" |
| "dubbo.apache.org/dubbo-go/v3/remoting" |
| ) |
| |
| var attachmentKey = []string{ |
| constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey, |
| constant.VersionKey, |
| } |
| |
| // DubboInvoker is implement of protocol.Invoker. A dubboInvoker refers to one service and ip. |
| type DubboInvoker struct { |
| protocol.BaseInvoker |
| clientGuard *sync.RWMutex // the exchange layer, it is focus on network communication. |
| client *remoting.ExchangeClient |
| quitOnce sync.Once |
| timeout time.Duration // timeout for service(interface) level. |
| } |
| |
| // NewDubboInvoker constructor |
| func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInvoker { |
| rt := config.GetConsumerConfig().RequestTimeout |
| |
| timeout := url.GetParamDuration(constant.TimeoutKey, rt) |
| di := &DubboInvoker{ |
| BaseInvoker: *protocol.NewBaseInvoker(url), |
| clientGuard: &sync.RWMutex{}, |
| client: client, |
| timeout: timeout, |
| } |
| |
| return di |
| } |
| |
| func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) { |
| di.clientGuard.Lock() |
| defer di.clientGuard.Unlock() |
| |
| di.client = client |
| } |
| |
| func (di *DubboInvoker) getClient() *remoting.ExchangeClient { |
| di.clientGuard.RLock() |
| defer di.clientGuard.RUnlock() |
| |
| return di.client |
| } |
| |
| // Invoke call remoting. |
| func (di *DubboInvoker) Invoke(ctx context.Context, ivc protocol.Invocation) protocol.Result { |
| var ( |
| err error |
| result protocol.RPCResult |
| ) |
| if !di.BaseInvoker.IsAvailable() { |
| // Generally, the case will not happen, because the invoker has been removed |
| // from the invoker list before destroy,so no new request will enter the destroyed invoker |
| logger.Warnf("this dubboInvoker is destroyed") |
| result.Err = protocol.ErrDestroyedInvoker |
| return &result |
| } |
| |
| di.clientGuard.RLock() |
| defer di.clientGuard.RUnlock() |
| |
| if di.client == nil { |
| result.Err = protocol.ErrClientClosed |
| logger.Debugf("result.Err: %v", result.Err) |
| return &result |
| } |
| |
| if !di.BaseInvoker.IsAvailable() { |
| // Generally, the case will not happen, because the invoker has been removed |
| // from the invoker list before destroy,so no new request will enter the destroyed invoker |
| logger.Warnf("this dubboInvoker is destroying") |
| result.Err = protocol.ErrDestroyedInvoker |
| return &result |
| } |
| |
| inv := ivc.(*invocation.RPCInvocation) |
| // init param |
| inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, "")) |
| for _, k := range attachmentKey { |
| if v := di.GetURL().GetParam(k, ""); len(v) > 0 { |
| inv.SetAttachment(k, v) |
| } |
| } |
| |
| // put the ctx into attachment |
| di.appendCtx(ctx, inv) |
| |
| url := di.GetURL() |
| // default hessian2 serialization, compatible |
| if url.GetParam(constant.SerializationKey, "") == "" { |
| url.SetParam(constant.SerializationKey, constant.Hessian2Serialization) |
| } |
| // async |
| async, err := strconv.ParseBool(inv.GetAttachmentWithDefaultValue(constant.AsyncKey, "false")) |
| if err != nil { |
| logger.Errorf("ParseBool - error: %v", err) |
| async = false |
| } |
| // response := NewResponse(inv.Reply(), nil) |
| rest := &protocol.RPCResult{} |
| timeout := di.getTimeout(inv) |
| if async { |
| if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { |
| result.Err = di.client.AsyncRequest(&ivc, url, timeout, callBack, rest) |
| } else { |
| result.Err = di.client.Send(&ivc, url, timeout) |
| } |
| } else { |
| if inv.Reply() == nil { |
| result.Err = protocol.ErrNoReply |
| } else { |
| result.Err = di.client.Request(&ivc, url, timeout, rest) |
| } |
| } |
| if result.Err == nil { |
| result.Rest = inv.Reply() |
| result.Attrs = rest.Attrs |
| } |
| |
| return &result |
| } |
| |
| // get timeout including methodConfig |
| func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation) time.Duration { |
| timeout := di.timeout //default timeout |
| if attachTimeout, ok := ivc.GetAttachment(constant.TimeoutKey); ok { //check invocation timeout |
| timeout, _ = time.ParseDuration(attachTimeout) |
| } else { // check method timeout |
| methodName := ivc.MethodName() |
| if di.GetURL().GetParamBool(constant.GenericKey, false) { |
| methodName = ivc.Arguments()[0].(string) |
| } |
| mTimeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, constant.TimeoutKey}, "."), "") |
| if len(mTimeout) != 0 { |
| timeout, _ = time.ParseDuration(mTimeout) |
| } |
| } |
| // set timeout into invocation |
| ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(timeout.Nanoseconds()))) |
| return timeout |
| } |
| |
| func (di *DubboInvoker) IsAvailable() bool { |
| client := di.getClient() |
| if client != nil { |
| return client.IsAvailable() |
| } |
| |
| return false |
| } |
| |
| // Destroy destroy dubbo client invoker. |
| func (di *DubboInvoker) Destroy() { |
| di.quitOnce.Do(func() { |
| di.BaseInvoker.Destroy() |
| client := di.getClient() |
| if client != nil { |
| activeNumber := client.DecreaseActiveNumber() |
| di.setClient(nil) |
| if activeNumber == 0 { |
| exchangeClientMap.Delete(di.GetURL().Location) |
| client.Close() |
| } |
| } |
| }) |
| } |
| |
| // Finally, I made the decision that I don't provide a general way to transfer the whole context |
| // because it could be misused. If the context contains to many key-value pairs, the performance will be much lower. |
| func (di *DubboInvoker) appendCtx(ctx context.Context, ivc *invocation.RPCInvocation) { |
| // inject opentracing ctx |
| currentSpan := opentracing.SpanFromContext(ctx) |
| if currentSpan != nil { |
| err := injectTraceCtx(currentSpan, ivc) |
| if err != nil { |
| logger.Errorf("Could not inject the span context into attachments: %v", err) |
| } |
| } |
| } |