blob: f3ebf3b8ec3735844d2b6c3ac3451ebda3321ddc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpc
import (
"context"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"testing"
"time"
)
func Test_newEventMeshGRPCClient(t *testing.T) {
type args struct {
cfg *conf.GRPCConfig
}
tests := []struct {
name string
args args
want *eventMeshGRPCClient
wantErr bool
}{
{
name: "host is empty",
args: args{cfg: &conf.GRPCConfig{
Host: "",
}},
wantErr: true,
want: nil,
},
{
name: "producer wrong",
args: args{cfg: &conf.GRPCConfig{
Host: "1.1.1.1",
ProducerConfig: conf.ProducerConfig{},
}},
wantErr: true,
want: nil,
},
{
name: "client with send msg",
args: args{cfg: &conf.GRPCConfig{
Host: "101.43.84.47",
Port: 10205,
ENV: "sendmsgenv",
Region: "sh",
IDC: "idc01",
SYS: "test-system",
ProtocolType: "grpc",
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-producer-group",
},
Username: "user",
Password: "passwd",
}},
want: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli, err := newEventMeshGRPCClient(tt.args.cfg)
if (err != nil) != tt.wantErr {
t.Errorf("newEventMeshGRPCClient() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
assert.NoError(t, cli.Close())
}
})
}
}
func Test_multiple_set_context(t *testing.T) {
root := context.Background()
onec, cancel := context.WithTimeout(root, time.Second*5)
defer cancel()
valc := context.WithValue(onec, "test", "got")
select {
case <-valc.Done():
val := valc.Value("test")
t.Logf("5 s reached, value in context:%v", val)
case <-time.After(time.Second * 10):
t.Logf("ooor, 10s timeout")
}
}
func Test_eventMeshGRPCClient_Publish(t *testing.T) {
// run fake server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: false,
},
})
assert.NoError(t, err, "create grpc client")
type args struct {
ctx context.Context
msg *proto.SimpleMessage
opts []grpc.CallOption
}
tests := []struct {
name string
args args
want *proto.Response
wantErr assert.ErrorAssertionFunc
}{
{
name: "publish msg",
args: args{
ctx: context.TODO(),
msg: &proto.SimpleMessage{
Header: &proto.RequestHeader{},
Topic: "test-publish-topic",
},
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return true
},
},
{
name: "publish with timeout",
args: args{
ctx: func() context.Context {
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
return ctx
}(),
msg: &proto.SimpleMessage{
Header: &proto.RequestHeader{},
Topic: "test-timeout-topic",
},
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return true
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
assert.NoError(t, err)
t.Logf("receive publish response:%v", got.String())
assert.NoError(t, cli.Close())
})
}
}
func Test_eventMeshGRPCClient_RequestReply(t *testing.T) {
// run fake server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: false,
},
})
assert.NoError(t, err, "create grpc client")
type args struct {
ctx context.Context
msg *proto.SimpleMessage
opts []grpc.CallOption
}
tests := []struct {
name string
args args
want *proto.SimpleMessage
wantErr assert.ErrorAssertionFunc
}{
{
name: "test-request-reply",
args: args{
ctx: context.TODO(),
msg: &proto.SimpleMessage{
Header: &proto.RequestHeader{},
Topic: "test-request-reply-topic",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := cli.RequestReply(tt.args.ctx, tt.args.msg, tt.args.opts...)
assert.NoError(t, err)
t.Logf("receive request reply response:%v", got.String())
assert.NoError(t, cli.Close())
})
}
}
func Test_eventMeshGRPCClient_BatchPublish(t *testing.T) {
// run fake server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: false,
},
})
assert.NoError(t, err, "create grpc client")
type args struct {
ctx context.Context
msg *proto.BatchMessage
opts []grpc.CallOption
}
tests := []struct {
name string
args args
want *proto.Response
wantErr assert.ErrorAssertionFunc
}{
{
name: "test batch publish",
args: args{
ctx: context.TODO(),
msg: &proto.BatchMessage{
Header: &proto.RequestHeader{},
ProducerGroup: "fake-batch-group",
Topic: "fake-batch-topic",
MessageItem: []*proto.BatchMessage_MessageItem{
{
Content: "batch-1",
Ttl: "1",
UniqueId: "batch-id",
SeqNum: "1",
Tag: "tag",
Properties: map[string]string{
"from": "test",
"type": "batch-msg",
},
},
{
Content: "batch-2",
Ttl: "2",
UniqueId: "batch-id",
SeqNum: "2",
Tag: "tag",
Properties: map[string]string{
"from": "test",
"type": "batch-msg",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := cli.BatchPublish(tt.args.ctx, tt.args.msg, tt.args.opts...)
assert.NoError(t, err)
t.Logf("receive request reply response:%v", got.String())
assert.NoError(t, cli.Close())
})
}
}
func Test_eventMeshGRPCClient_webhook_subscribe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
go runWebhookServer(ctx)
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: true,
ConsumerGroup: "test-consumer-group-subscribe",
PoolSize: 5,
},
HeartbeatConfig: conf.HeartbeatConfig{
Period: time.Second * 5,
Timeout: time.Second * 3,
},
})
assert.NoError(t, err, "create grpc client")
assert.NoError(t, cli.SubscribeWebhook(conf.SubscribeItem{
SubscribeMode: 1,
SubscribeType: 1,
Topic: "topic-1",
}, "http://localhost:8080/onmessage"))
time.Sleep(time.Second * 5)
}
func Test_eventMeshGRPCClient_Subscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: true,
ConsumerGroup: "test-consumer-group-subscribe",
PoolSize: 5,
},
HeartbeatConfig: conf.HeartbeatConfig{
Period: time.Second * 5,
Timeout: time.Second * 3,
},
})
assert.NoError(t, err, "create grpc client")
type args struct {
item conf.SubscribeItem
handler OnMessage
}
tests := []struct {
name string
args args
wantErr assert.ErrorAssertionFunc
}{
{
name: "subcribe one",
args: args{
item: conf.SubscribeItem{
SubscribeMode: 1,
SubscribeType: 1,
Topic: "topic-1",
},
handler: func(message *proto.SimpleMessage) interface{} {
t.Logf("receive subscribe response:%s", message.String())
return nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := cli.SubscribeStream(tt.args.item, tt.args.handler)
assert.NoError(t, err)
assert.NoError(t, cli.Close())
})
}
}
func Test_eventMeshGRPCClient_UnSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runFakeServer(ctx)
cli, err := New(&conf.GRPCConfig{
Host: "127.0.0.1",
Port: 8086,
ProducerConfig: conf.ProducerConfig{
ProducerGroup: "test-publish-group",
},
ConsumerConfig: conf.ConsumerConfig{
Enabled: true,
ConsumerGroup: "test-consumer-group-subscribe",
PoolSize: 5,
},
HeartbeatConfig: conf.HeartbeatConfig{
Period: time.Second * 5,
Timeout: time.Second * 3,
},
})
assert.NoError(t, err, "create grpc client")
err = cli.SubscribeStream(conf.SubscribeItem{
SubscribeMode: 1,
SubscribeType: 1,
Topic: "topic-1",
}, func(message *proto.SimpleMessage) interface{} {
t.Logf("receive subscribe response:%s", message.String())
return nil
})
assert.NoError(t, err, "subscribe err")
tests := []struct {
name string
wantErr assert.ErrorAssertionFunc
}{
{
name: "unsubcribe",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, cli.UnSubscribe())
assert.NoError(t, cli.Close())
})
}
}
type fakeidg struct {
}
func (f *fakeidg) Next() string {
return "fake"
}
func Test_eventMeshGRPCClient_setupContext(t *testing.T) {
type args struct {
ctx context.Context
}
cli := &eventMeshGRPCClient{
idg: &fakeidg{},
}
tests := []struct {
name string
args args
want string
}{
{
name: "setup with uid",
args: args{
ctx: context.WithValue(context.Background(), GRPC_ID_KEY, "value"),
},
want: "value",
},
{
name: "setup without uid",
args: args{
ctx: context.TODO(),
},
want: "fake",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := cli.setupContext(tt.args.ctx)
assert.Equal(t, ctx.Value(GRPC_ID_KEY).(string), tt.want)
})
}
}