blob: dd72625b64674483422276105741d076fac0e174 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package triple
import (
"context"
"errors"
"fmt"
"io"
"os"
"reflect"
"strings"
"testing"
"time"
)
import (
grpc_go "github.com/dubbogo/grpc-go"
"github.com/stretchr/testify/assert"
)
import (
"dubbo.apache.org/dubbo-go/v3/client"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
dubbo3_api "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/dubbo3_server/api"
greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
dubbo3_greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/dubbo3_gen"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
_ "dubbo.apache.org/dubbo-go/v3/proxy/proxy_factory"
"dubbo.apache.org/dubbo-go/v3/server"
)
const (
triplePort = "21000"
dubbo3Port = "21001"
listenAddr = "0.0.0.0"
localAddr = "127.0.0.1"
name = "triple"
group = "g1"
version = "v1"
customTripleInterfaceName = "apache.dubbo.org.triple"
customDubbo3InterfaceName = "apache.dubbo.org.dubbo3"
)
type tripleInvoker struct {
url *common.URL
info *server.ServiceInfo
base *protocol.BaseInvoker
handler interface{}
}
func (t *tripleInvoker) GetURL() *common.URL {
return t.url
}
func (t *tripleInvoker) IsAvailable() bool {
return t.base.IsAvailable()
}
func (t *tripleInvoker) Destroy() {
t.base.Destroy()
}
func (t *tripleInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
name := invocation.MethodName()
args := invocation.Arguments()
// todo(DMwangnima): use map to represent Methods
for _, method := range t.info.Methods {
if method.Name == name {
res, err := method.MethodFunc(ctx, args, t.handler)
result := new(protocol.RPCResult)
result.SetResult(res)
result.SetError(err)
return result
}
}
panic(fmt.Sprintf("no match method for %s", name))
}
func runTripleServer(interfaceName string, group string, version string, addr string, info *server.ServiceInfo, handler interface{}) {
url := common.NewURLWithOptions(
common.WithPath(interfaceName),
common.WithLocation(addr),
common.WithPort(triplePort),
common.WithProtocol(TRIPLE),
common.WithInterface(interfaceName),
)
url.SetParam(constant.GroupKey, group)
url.SetParam(constant.VersionKey, version)
var invoker protocol.Invoker
if info != nil {
invoker = &tripleInvoker{
url: url,
info: info,
base: protocol.NewBaseInvoker(url),
handler: handler,
}
}
GetProtocol().(*TripleProtocol).exportForTest(invoker, info)
}
func runOldTripleServer(interfaceName string, group string, version string, addr string, desc *grpc_go.ServiceDesc, svc common.RPCService) {
url := common.NewURLWithOptions(
// todo(DMwangnima): figure this out
common.WithPath(interfaceName),
common.WithLocation(addr),
common.WithPort(dubbo3Port),
common.WithProtocol(TRIPLE),
common.WithInterface(interfaceName),
)
url.SetParam(constant.GroupKey, group)
url.SetParam(constant.VersionKey, version)
// todo(DMwangnima): add protocol config
config.SetRootConfig(
*config.NewRootConfigBuilder().
SetProvider(
config.NewProviderConfigBuilder().
AddService(common.GetReference(svc), config.NewServiceConfigBuilder().
SetInterface(interfaceName).
SetGroup(group).
SetVersion(version).
Build()).
SetProxyFactory("default").
Build()).
Build())
config.SetProviderService(svc)
common.ServiceMap.Register(interfaceName, TRIPLE, group, version, svc)
invoker := extension.GetProxyFactory("default").GetInvoker(url)
GetProtocol().(*TripleProtocol).exportForTest(invoker, nil)
}
func TestMain(m *testing.M) {
runTripleServer(
customTripleInterfaceName,
"",
"",
listenAddr,
&greettriple.GreetService_ServiceInfo,
new(api.GreetTripleServer),
)
runTripleServer(
customTripleInterfaceName,
group,
version,
listenAddr,
&greettriple.GreetService_ServiceInfo,
new(api.GreetTripleServerGroup1Version1),
)
runOldTripleServer(
customDubbo3InterfaceName,
"",
"",
listenAddr,
&dubbo3_greet.GreetService_ServiceDesc,
new(dubbo3_api.GreetDubbo3Server),
)
runOldTripleServer(
customDubbo3InterfaceName,
group,
version,
listenAddr,
&dubbo3_greet.GreetService_ServiceDesc,
new(dubbo3_api.GreetDubbo3ServerGroup1Version1),
)
time.Sleep(3 * time.Second)
m.Run()
os.Exit(0)
}
func TestInvoke(t *testing.T) {
tripleInvokerInit := func(location string, port string, interfaceName string, group string, version string, methods []string, info *client.ClientInfo) (protocol.Invoker, error) {
newURL := common.NewURLWithOptions(
common.WithInterface(interfaceName),
common.WithLocation(location),
common.WithPort(port),
common.WithMethods(methods),
common.WithAttribute(constant.ClientInfoKey, info),
)
newURL.SetParam(constant.GroupKey, group)
newURL.SetParam(constant.VersionKey, version)
return NewTripleInvoker(newURL)
}
dubbo3InvokerInit := func(location string, port string, interfaceName string, group string, version string, svc common.RPCService) (protocol.Invoker, error) {
newURL := common.NewURLWithOptions(
common.WithInterface(interfaceName),
common.WithLocation(location),
common.WithPort(port),
)
newURL.SetParam(constant.GroupKey, group)
newURL.SetParam(constant.VersionKey, version)
// dubbo3 needs to retrieve ConsumerService directly
config.SetConsumerServiceByInterfaceName(interfaceName, svc)
return NewDubbo3Invoker(newURL)
}
tripleInvocationInit := func(methodName string, rawParams []interface{}, callType string) protocol.Invocation {
newInv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithParameterRawValues(rawParams),
)
newInv.SetAttribute(constant.CallTypeKey, callType)
return newInv
}
dubbo3InvocationInit := func(methodName string, params []reflect.Value, reply interface{}) protocol.Invocation {
newInv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithParameterValues(params),
)
newInv.SetReply(reply)
return newInv
}
dubbo3ReplyInit := func(fieldType reflect.Type) interface{} {
var reply reflect.Value
replyType := fieldType.Out(0)
if replyType.Kind() == reflect.Ptr {
reply = reflect.New(replyType.Elem())
} else {
reply = reflect.New(replyType)
}
return reply.Interface()
}
invokeTripleCodeFunc := func(t *testing.T, invoker protocol.Invoker, identifier string) {
tests := []struct {
methodName string
callType string
rawParams []interface{}
validate func(t *testing.T, rawParams []interface{}, res protocol.Result)
}{
{
methodName: "Greet",
callType: constant.CallUnary,
rawParams: []interface{}{
&greet.GreetRequest{
Name: name,
},
&greet.GreetResponse{},
},
validate: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Result())
assert.Nil(t, res.Error())
req := params[0].(*greet.GreetRequest)
resp := params[1].(*greet.GreetResponse)
assert.Equal(t, identifier+req.Name, resp.Greeting)
},
},
{
methodName: "GreetClientStream",
callType: constant.CallClientStream,
validate: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
streamRaw, ok := res.Result().(*triple_protocol.ClientStreamForClient)
assert.True(t, ok)
stream := &greettriple.GreetServiceGreetClientStreamClient{ClientStreamForClient: streamRaw}
var expectRes []string
times := 5
for i := 1; i <= times; i++ {
expectRes = append(expectRes, identifier+name)
err := stream.Send(&greet.GreetClientStreamRequest{Name: name})
assert.Nil(t, err)
}
expectStr := strings.Join(expectRes, ",")
resp, err := stream.CloseAndRecv()
assert.Nil(t, err)
assert.Equal(t, expectStr, resp.Greeting)
},
},
{
methodName: "GreetServerStream",
callType: constant.CallServerStream,
rawParams: []interface{}{
&greet.GreetServerStreamRequest{
Name: "dubbo",
},
},
validate: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
req := params[0].(*greet.GreetServerStreamRequest)
streamRaw, ok := res.Result().(*triple_protocol.ServerStreamForClient)
stream := &greettriple.GreetServiceGreetServerStreamClient{ServerStreamForClient: streamRaw}
assert.True(t, ok)
times := 5
for i := 1; i <= times; i++ {
for stream.Recv() {
assert.Nil(t, stream.Err())
assert.Equal(t, identifier+req.Name, stream.Msg().Greeting)
}
assert.True(t, true, errors.Is(stream.Err(), io.EOF))
}
},
},
{
methodName: "GreetStream",
callType: constant.CallBidiStream,
validate: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
streamRaw, ok := res.Result().(*triple_protocol.BidiStreamForClient)
assert.True(t, ok)
stream := &greettriple.GreetServiceGreetStreamClient{BidiStreamForClient: streamRaw}
for i := 1; i <= 5; i++ {
err := stream.Send(&greet.GreetStreamRequest{Name: name})
assert.Nil(t, err)
resp, err := stream.Recv()
assert.Nil(t, err)
assert.Equal(t, identifier+name, resp.Greeting)
}
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
},
},
}
for _, test := range tests {
t.Run(test.methodName, func(t *testing.T) {
inv := tripleInvocationInit(test.methodName, test.rawParams, test.callType)
res := invoker.Invoke(context.Background(), inv)
test.validate(t, test.rawParams, res)
})
}
}
invokeDubbo3CodeFunc := func(t *testing.T, invoker protocol.Invoker, svc common.RPCService, identifier string) {
tests := []struct {
methodName string
params []reflect.Value
validate func(t *testing.T, params []reflect.Value, res protocol.Result)
}{
{
methodName: "Greet",
params: []reflect.Value{
reflect.ValueOf(&greet.GreetRequest{
Name: name,
}),
},
validate: func(t *testing.T, Params []reflect.Value, res protocol.Result) {
assert.Nil(t, res.Error())
req := Params[0].Interface().(*greet.GreetRequest)
resp := res.Result().(*greet.GreetResponse)
assert.Equal(t, identifier+req.Name, resp.Greeting)
},
},
{
methodName: "GreetClientStream",
validate: func(t *testing.T, reflectParams []reflect.Value, res protocol.Result) {
assert.Nil(t, res.Error())
stream, ok := res.Result().(*dubbo3_greet.GreetService_GreetClientStreamClient)
assert.True(t, ok)
var expectRes []string
times := 5
for i := 1; i <= times; i++ {
expectRes = append(expectRes, identifier+name)
err := (*stream).Send(&greet.GreetClientStreamRequest{Name: name})
assert.Nil(t, err)
}
expectStr := strings.Join(expectRes, ",")
resp, err := (*stream).CloseAndRecv()
assert.Nil(t, err)
assert.Equal(t, expectStr, resp.Greeting)
},
},
{
methodName: "GreetServerStream",
params: []reflect.Value{
reflect.ValueOf(&greet.GreetServerStreamRequest{
Name: "dubbo",
}),
},
validate: func(t *testing.T, params []reflect.Value, res protocol.Result) {
assert.Nil(t, res.Error())
req := params[0].Interface().(*greet.GreetServerStreamRequest)
stream, ok := res.Result().(*dubbo3_greet.GreetService_GreetServerStreamClient)
assert.True(t, ok)
times := 5
for i := 1; i <= times; i++ {
msg, err := (*stream).Recv()
assert.Nil(t, err)
assert.Equal(t, identifier+req.Name, msg.Greeting)
}
},
},
{
methodName: "GreetStream",
validate: func(t *testing.T, params []reflect.Value, res protocol.Result) {
assert.Nil(t, res.Error())
stream, ok := res.Result().(*dubbo3_greet.GreetService_GreetStreamClient)
assert.True(t, ok)
for i := 1; i <= 5; i++ {
err := (*stream).Send(&greet.GreetStreamRequest{Name: name})
assert.Nil(t, err)
resp, err := (*stream).Recv()
assert.Nil(t, err)
assert.Equal(t, identifier+name, resp.Greeting)
}
assert.Nil(t, (*stream).CloseSend())
},
},
}
svcPtrVal := reflect.ValueOf(svc)
svcVal := svcPtrVal.Elem()
svcType := svcVal.Type()
for _, test := range tests {
t.Run(test.methodName, func(t *testing.T) {
funcField, ok := svcType.FieldByName(test.methodName)
assert.True(t, ok)
reply := dubbo3ReplyInit(funcField.Type)
inv := dubbo3InvocationInit(test.methodName, test.params, reply)
res := invoker.Invoke(context.Background(), inv)
test.validate(t, test.params, res)
})
}
}
t.Run("triple2triple", func(t *testing.T) {
invoker, err := tripleInvokerInit(localAddr, triplePort, customTripleInterfaceName, "", "", greettriple.GreetService_ClientInfo.MethodNames, &greettriple.GreetService_ClientInfo)
assert.Nil(t, err)
invokeTripleCodeFunc(t, invoker, "")
})
t.Run("triple2triple_Group1Version1", func(t *testing.T) {
invoker, err := tripleInvokerInit(localAddr, triplePort, customTripleInterfaceName, group, version, greettriple.GreetService_ClientInfo.MethodNames, &greettriple.GreetService_ClientInfo)
assert.Nil(t, err)
invokeTripleCodeFunc(t, invoker, api.GroupVersionIdentifier)
})
t.Run("triple2dubbo3", func(t *testing.T) {
invoker, err := tripleInvokerInit(localAddr, dubbo3Port, customDubbo3InterfaceName, "", "", greettriple.GreetService_ClientInfo.MethodNames, &greettriple.GreetService_ClientInfo)
assert.Nil(t, err)
invokeTripleCodeFunc(t, invoker, "")
})
t.Run("triple2dubbo3_Group1Version1", func(t *testing.T) {
invoker, err := tripleInvokerInit(localAddr, dubbo3Port, customDubbo3InterfaceName, group, version, greettriple.GreetService_ClientInfo.MethodNames, &greettriple.GreetService_ClientInfo)
assert.Nil(t, err)
invokeTripleCodeFunc(t, invoker, dubbo3_api.GroupVersionIdentifier)
})
t.Run("dubbo32triple", func(t *testing.T) {
svc := new(dubbo3_greet.GreetServiceClientImpl)
invoker, err := dubbo3InvokerInit(localAddr, triplePort, customTripleInterfaceName, "", "", svc)
assert.Nil(t, err)
invokeDubbo3CodeFunc(t, invoker, svc, "")
})
t.Run("dubbo32triple_Group1Version1", func(t *testing.T) {
svc := new(dubbo3_greet.GreetServiceClientImpl)
invoker, err := dubbo3InvokerInit(localAddr, triplePort, customTripleInterfaceName, group, version, svc)
assert.Nil(t, err)
invokeDubbo3CodeFunc(t, invoker, svc, api.GroupVersionIdentifier)
})
t.Run("dubbo32dubbo3", func(t *testing.T) {
svc := new(dubbo3_greet.GreetServiceClientImpl)
invoker, err := dubbo3InvokerInit(localAddr, dubbo3Port, customDubbo3InterfaceName, "", "", svc)
assert.Nil(t, err)
invokeDubbo3CodeFunc(t, invoker, svc, "")
})
t.Run("dubbo32dubbo3_Group1Version1", func(t *testing.T) {
svc := new(dubbo3_greet.GreetServiceClientImpl)
invoker, err := dubbo3InvokerInit(localAddr, dubbo3Port, customDubbo3InterfaceName, group, version, svc)
assert.Nil(t, err)
invokeDubbo3CodeFunc(t, invoker, svc, dubbo3_api.GroupVersionIdentifier)
})
}