blob: 72da5904f7dcfe94f9bab711e5e079a29dd95625 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package proxy
import (
"context"
"errors"
"reflect"
"sync"
)
import (
"github.com/apache/dubbo-go-hessian2/java_exception"
"github.com/dubbogo/gost/log/logger"
perrors "github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
// nolint
type Proxy struct {
rpc common.RPCService
invoke protocol.Invoker
callback interface{}
attachments map[string]string
implement ImplementFunc
once sync.Once
}
type (
// ProxyOption a function to init Proxy with options
ProxyOption func(p *Proxy)
// ImplementFunc function for proxy impl of RPCService functions
ImplementFunc func(p *Proxy, v common.RPCService)
)
var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
// NewProxy create service proxy.
func NewProxy(invoke protocol.Invoker, callback interface{}, attachments map[string]string) *Proxy {
return NewProxyWithOptions(invoke, callback, attachments,
WithProxyImplementFunc(DefaultProxyImplementFunc))
}
// NewProxyWithOptions create service proxy with options.
func NewProxyWithOptions(invoke protocol.Invoker, callback interface{}, attachments map[string]string, opts ...ProxyOption) *Proxy {
p := &Proxy{
invoke: invoke,
callback: callback,
attachments: attachments,
}
for _, opt := range opts {
opt(p)
}
return p
}
// WithProxyImplementFunc an option function to setup proxy.ImplementFunc
func WithProxyImplementFunc(f ImplementFunc) ProxyOption {
return func(p *Proxy) {
p.implement = f
}
}
// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
func (p *Proxy) Implement(v common.RPCService) {
p.once.Do(func() {
p.implement(p, v)
p.rpc = v
})
}
// Get gets rpc service instance.
func (p *Proxy) Get() common.RPCService {
return p.rpc
}
// GetCallback gets callback.
func (p *Proxy) GetCallback() interface{} {
return p.callback
}
// GetInvoker gets Invoker.
func (p *Proxy) GetInvoker() protocol.Invoker {
return p.invoke
}
// DefaultProxyImplementFunc the default function for proxy impl
func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
// check parameters, incoming interface must be a elem's pointer.
valueOf := reflect.ValueOf(v)
valueOfElem := valueOf.Elem()
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
var (
err error
inv *invocation_impl.RPCInvocation
inIArr []interface{}
inVArr []reflect.Value
reply reflect.Value
replyEmptyFlag bool
)
if methodName == "Echo" {
methodName = "$echo"
}
if len(outs) == 2 { // return (reply, error)
if outs[0].Kind() == reflect.Ptr {
reply = reflect.New(outs[0].Elem())
} else {
reply = reflect.New(outs[0])
}
} else { // only return error
replyEmptyFlag = true
}
start := 0
end := len(in)
invCtx := context.Background()
// retrieve the context from the first argument if existed
if end > 0 {
if in[0].Type().String() == "context.Context" {
if !in[0].IsNil() {
// the user declared context as method's parameter
invCtx = in[0].Interface().(context.Context)
}
start += 1
}
}
if end-start <= 0 {
inIArr = []interface{}{}
inVArr = []reflect.Value{}
} else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 {
inIArr = v
inVArr = []reflect.Value{in[start]}
} else {
inIArr = make([]interface{}, end-start)
inVArr = make([]reflect.Value, end-start)
index := 0
for i := start; i < end; i++ {
inIArr[index] = in[i].Interface()
inVArr[index] = in[i]
index++
}
}
inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr))
if !replyEmptyFlag {
inv.SetReply(reply.Interface())
}
for k, value := range p.attachments {
inv.SetAttachment(k, value)
}
// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachment(k, value)
}
} else if m2, ok2 := atm.(map[string]interface{}); ok2 {
// it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7.
for k, value := range m2 {
inv.SetAttachment(k, value)
}
}
result := p.invoke.Invoke(invCtx, inv)
err = result.Error()
// cause is raw user level error
cause := perrors.Cause(err)
if err != nil {
// if some error happened, it should be log some info in the separate file.
if throwabler, ok := cause.(java_exception.Throwabler); ok {
logger.Warnf("[CallProxy] invoke service throw exception: %v , stackTraceElements: %v", cause.Error(), throwabler.GetStackTrace())
} else {
// entire error is only for printing, do not return, because user would not want to deal with massive framework-level error message
logger.Warnf("[CallProxy] received rpc err: %v", err)
}
} else {
logger.Debugf("[CallProxy] received rpc result successfully: %s", result)
}
if len(outs) == 1 {
return []reflect.Value{reflect.ValueOf(&cause).Elem()}
}
if len(outs) == 2 && outs[0].Kind() != reflect.Ptr {
return []reflect.Value{reply.Elem(), reflect.ValueOf(&cause).Elem()}
}
return []reflect.Value{reply, reflect.ValueOf(&cause).Elem()}
}
}
if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil {
logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err)
return
}
}
func refectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value) error {
typeOf := valueOfElem.Type()
// check incoming interface, incoming interface's elem must be a struct.
if typeOf.Kind() != reflect.Struct {
return errors.New("invalid type kind")
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
methodName := t.Tag.Get("dubbo")
if methodName == "" {
methodName = t.Name
}
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() {
outNum := t.Type.NumOut()
if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
t.Name, t.Type.String(), outNum)
continue
}
// The latest return type of the method must be error.
if returnType := t.Type.Out(outNum - 1); returnType != typError {
logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
continue
}
funcOuts := make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}
// do method proxy here:
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
} else if f.IsValid() && f.CanSet() {
// for struct combination
valueOfSub := reflect.New(t.Type)
valueOfElemInterface := valueOfSub.Elem()
if valueOfElemInterface.Type().Kind() == reflect.Struct {
if err := refectAndMakeObjectFunc(valueOfElemInterface, makeDubboCallProxy); err != nil {
return err
}
f.Set(valueOfElemInterface)
}
}
}
return nil
}