blob: f96caf34dd117732f5ea6e130d1bc663dce0fb57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package triple
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"testing"
"time"
)
import (
grpc_go "github.com/dubbogo/grpc-go"
"github.com/stretchr/testify/assert"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
dubbo3_api "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/dubbo3_server/api"
greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
dubbo3_greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/dubbo3_gen"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
_ "dubbo.apache.org/dubbo-go/v3/proxy/proxy_factory"
"dubbo.apache.org/dubbo-go/v3/server"
)
const (
triplePort = "20000"
dubbo3Port = "20001"
listenAddr = "0.0.0.0"
name = "triple"
)
type tripleInvoker struct {
url *common.URL
info *server.ServiceInfo
base *protocol.BaseInvoker
handler interface{}
}
func (t *tripleInvoker) GetURL() *common.URL {
return t.url
}
func (t *tripleInvoker) IsAvailable() bool {
return t.base.IsAvailable()
}
func (t *tripleInvoker) Destroy() {
t.base.Destroy()
}
func (t *tripleInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
name := invocation.MethodName()
args := invocation.Arguments()
// todo(DMwangnima): user map to represent Methods
for _, method := range t.info.Methods {
if method.Name == name {
res, err := method.MethodFunc(ctx, args, t.handler)
result := new(protocol.RPCResult)
result.SetResult(res)
result.SetError(err)
return result
}
}
panic(fmt.Sprintf("no match method for %s", name))
}
func runTripleServer(interfaceName string, addr string, info *server.ServiceInfo, handler interface{}) {
url := common.NewURLWithOptions(
common.WithPath(interfaceName),
common.WithLocation(addr),
common.WithPort(triplePort),
)
var invoker protocol.Invoker
if info != nil {
invoker = &tripleInvoker{
url: url,
info: info,
base: protocol.NewBaseInvoker(url),
handler: handler,
}
}
GetProtocol().(*TripleProtocol).exportForTest(invoker, info)
}
func runOldTripleServer(addr string, desc *grpc_go.ServiceDesc) {
url := common.NewURLWithOptions(
common.WithPath(desc.ServiceName),
common.WithLocation(addr),
common.WithPort(dubbo3Port),
common.WithProtocol(TRIPLE),
)
srv := new(dubbo3_api.GreetDubbo3Server)
// todo(DMwangnima): add protocol config
config.SetRootConfig(
*config.NewRootConfigBuilder().
SetProvider(
config.NewProviderConfigBuilder().
AddService(common.GetReference(srv), config.NewServiceConfigBuilder().
SetInterface(desc.ServiceName).
Build()).
SetProxyFactory("default").
Build()).
Build())
config.SetProviderService(srv)
common.ServiceMap.Register(desc.ServiceName, TRIPLE, "", "", srv)
invoker := extension.GetProxyFactory("default").GetInvoker(url)
GetProtocol().(*TripleProtocol).exportForTest(invoker, nil)
}
func TestMain(m *testing.M) {
runTripleServer(
greettriple.GreetServiceName,
listenAddr,
&greettriple.GreetService_ServiceInfo,
new(api.GreetTripleServer),
)
runOldTripleServer(
listenAddr,
&dubbo3_greet.GreetService_ServiceDesc,
)
time.Sleep(3 * time.Second)
m.Run()
os.Exit(0)
}
func TestInvoke(t *testing.T) {
invokeFunc := func(t *testing.T, port string, interfaceName string, methods []string) {
url := common.NewURLWithOptions(
common.WithInterface(interfaceName),
common.WithLocation("127.0.0.1"),
common.WithPort(port),
common.WithMethods(methods),
common.WithProtocol(TRIPLE),
)
invoker, err := NewTripleInvoker(url)
if err != nil {
t.Fatal(err)
}
tests := []struct {
desc string
methodName string
params []interface{}
invoke func(t *testing.T, params []interface{}, res protocol.Result)
callType string
}{
{
desc: "Unary",
methodName: "Greet",
params: []interface{}{
&greet.GreetRequest{
Name: name,
},
&greet.GreetResponse{},
},
invoke: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Result())
assert.Nil(t, res.Error())
req := params[0].(*greet.GreetRequest)
resp := params[1].(*greet.GreetResponse)
assert.Equal(t, req.Name, resp.Greeting)
},
callType: constant.CallUnary,
},
{
desc: "ClientStream",
methodName: "GreetClientStream",
invoke: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
streamRaw, ok := res.Result().(*triple_protocol.ClientStreamForClient)
assert.True(t, ok)
stream := &greettriple.GreetServiceGreetClientStreamClient{ClientStreamForClient: streamRaw}
var expectRes []string
times := 5
for i := 1; i <= times; i++ {
expectRes = append(expectRes, name)
err := stream.Send(&greet.GreetClientStreamRequest{Name: name})
assert.Nil(t, err)
}
expectStr := strings.Join(expectRes, ",")
resp, err := stream.CloseAndRecv()
assert.Nil(t, err)
assert.Equal(t, expectStr, resp.Greeting)
},
callType: constant.CallClientStream,
},
{
desc: "ServerStream",
methodName: "GreetServerStream",
params: []interface{}{
&greet.GreetServerStreamRequest{
Name: "dubbo",
},
},
invoke: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
req := params[0].(*greet.GreetServerStreamRequest)
streamRaw, ok := res.Result().(*triple_protocol.ServerStreamForClient)
stream := &greettriple.GreetServiceGreetServerStreamClient{ServerStreamForClient: streamRaw}
assert.True(t, ok)
times := 5
for i := 1; i <= times; i++ {
for stream.Recv() {
assert.Nil(t, stream.Err())
assert.Equal(t, req.Name, stream.Msg().Greeting)
}
assert.True(t, true, errors.Is(stream.Err(), io.EOF))
}
},
callType: constant.CallServerStream,
},
{
desc: "BidiStream",
methodName: "GreetStream",
invoke: func(t *testing.T, params []interface{}, res protocol.Result) {
assert.Nil(t, res.Error())
streamRaw, ok := res.Result().(*triple_protocol.BidiStreamForClient)
assert.True(t, ok)
stream := &greettriple.GreetServiceGreetStreamClient{BidiStreamForClient: streamRaw}
for i := 1; i <= 5; i++ {
err := stream.Send(&greet.GreetStreamRequest{Name: name})
assert.Nil(t, err)
resp, err := stream.Recv()
assert.Nil(t, err)
assert.Equal(t, name, resp.Greeting)
}
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
},
callType: constant.CallBidiStream,
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
inv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(test.methodName),
// todo: process opts
invocation_impl.WithParameterRawValues(test.params),
)
inv.SetAttribute(constant.CallTypeKey, test.callType)
res := invoker.Invoke(context.Background(), inv)
test.invoke(t, test.params, res)
})
}
}
t.Run("invoke server code generated by triple", func(t *testing.T) {
invokeFunc(t, triplePort, greettriple.GreetService_ClientInfo.InterfaceName, greettriple.GreetService_ClientInfo.MethodNames)
})
t.Run("invoke server code generated by dubbo3", func(t *testing.T) {
desc := dubbo3_greet.GreetService_ServiceDesc
var methods []string
for _, method := range desc.Methods {
methods = append(methods, method.MethodName)
}
for _, stream := range desc.Streams {
methods = append(methods, stream.StreamName)
}
invokeFunc(t, dubbo3Port, desc.ServiceName, methods)
})
}