blob: a751af81c77a902d9bb9609faf20215bc39dd0f5 [file] [log] [blame]
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package triple_protocol_test
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
triple "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/assert"
pingv1 "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect"
)
func TestNewClient_InitFailure(t *testing.T) {
t.Parallel()
client := pingv1connect.NewPingServiceClient(
http.DefaultClient,
"http://127.0.0.1:8080",
// This triggers an error during initialization, so each call will short circuit returning an error.
triple.WithSendCompression("invalid"),
)
validateExpectedError := func(t *testing.T, err error) {
t.Helper()
assert.NotNil(t, err)
var tripleErr *triple.Error
assert.True(t, errors.As(err, &tripleErr))
assert.Equal(t, tripleErr.Message(), `unknown compression "invalid"`)
}
t.Run("unary", func(t *testing.T) {
t.Parallel()
err := client.Ping(context.Background(), triple.NewRequest(&pingv1.PingRequest{}), triple.NewResponse(&pingv1.PingResponse{}))
validateExpectedError(t, err)
})
}
func TestClientPeer(t *testing.T) {
t.Parallel()
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{}))
server := httptest.NewUnstartedServer(mux)
server.EnableHTTP2 = true
server.StartTLS()
t.Cleanup(server.Close)
run := func(t *testing.T, opts ...triple.ClientOption) {
t.Helper()
client := pingv1connect.NewPingServiceClient(
server.Client(),
server.URL,
triple.WithClientOptions(opts...),
triple.WithInterceptors(&assertPeerInterceptor{t}),
)
ctx := context.Background()
// unary
var message *pingv1.PingRequest
err := client.Ping(ctx, triple.NewRequest(message), triple.NewResponse(&pingv1.PingResponse{}))
assert.Nil(t, err)
text := strings.Repeat(".", 256)
msg := &pingv1.PingResponse{}
err = client.Ping(ctx, triple.NewRequest(&pingv1.PingRequest{Text: text}), triple.NewResponse(msg))
assert.Nil(t, err)
assert.Equal(t, msg.Text, text)
////client streaming
//clientStream, err := client.Sum(ctx)
//assert.Nil(t, err)
//t.Cleanup(func() {
// closeErr := clientStream.CloseAndReceive(triple.NewResponse(&pingv1.SumResponse{}))
// assert.Nil(t, closeErr)
//})
//assert.NotZero(t, clientStream.Peer().Addr)
//assert.NotZero(t, clientStream.Peer().Protocol)
//err = clientStream.Send(&pingv1.SumRequest{})
//assert.Nil(t, err)
////server streaming
//serverStream, err := client.CountUp(ctx, triple.NewRequest(&pingv1.CountUpRequest{}))
//t.Cleanup(func() {
// assert.Nil(t, serverStream.Close())
//})
//assert.Nil(t, err)
//// bidi streaming
//bidiStream, err := client.CumSum(ctx)
//assert.Nil(t, err)
//t.Cleanup(func() {
// assert.Nil(t, bidiStream.CloseRequest())
// assert.Nil(t, bidiStream.CloseResponse())
//})
//assert.NotZero(t, bidiStream.Peer().Addr)
//assert.NotZero(t, bidiStream.Peer().Protocol)
//err = bidiStream.Send(&pingv1.CumSumRequest{})
//assert.Nil(t, err)
}
t.Run("triple", func(t *testing.T) {
t.Parallel()
run(t)
})
t.Run("grpc", func(t *testing.T) {
t.Parallel()
run(t, triple.WithGRPC())
})
}
type assertPeerInterceptor struct {
tb testing.TB
}
func (a *assertPeerInterceptor) WrapUnaryHandler(next triple.UnaryHandlerFunc) triple.UnaryHandlerFunc {
return func(ctx context.Context, req triple.AnyRequest) (triple.AnyResponse, error) {
assert.NotZero(a.tb, req.Peer().Addr)
assert.NotZero(a.tb, req.Peer().Protocol)
return next(ctx, req)
}
}
func (a *assertPeerInterceptor) WrapUnary(next triple.UnaryFunc) triple.UnaryFunc {
return func(ctx context.Context, req triple.AnyRequest, res triple.AnyResponse) error {
assert.NotZero(a.tb, req.Peer().Addr)
assert.NotZero(a.tb, req.Peer().Protocol)
return next(ctx, req, res)
}
}
func (a *assertPeerInterceptor) WrapStreamingClient(next triple.StreamingClientFunc) triple.StreamingClientFunc {
return func(ctx context.Context, spec triple.Spec) triple.StreamingClientConn {
conn := next(ctx, spec)
assert.NotZero(a.tb, conn.Peer().Addr)
assert.NotZero(a.tb, conn.Peer().Protocol)
assert.NotZero(a.tb, conn.Spec())
return conn
}
}
func (a *assertPeerInterceptor) WrapStreamingHandler(next triple.StreamingHandlerFunc) triple.StreamingHandlerFunc {
return func(ctx context.Context, conn triple.StreamingHandlerConn) error {
assert.NotZero(a.tb, conn.Peer().Addr)
assert.NotZero(a.tb, conn.Peer().Protocol)
assert.NotZero(a.tb, conn.Spec())
return next(ctx, conn)
}
}