blob: 27c91d2485771597660983447a7056b1dc64cb86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v3_test
import (
"context"
"fmt"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
import (
util_xds_v3 "github.com/apache/dubbo-kubernetes/pkg/util/xds/v3"
)
var _ = Describe("CallbacksChain", func() {
var first, second CallbacksFuncs
type methodCall struct {
obj string
method string
args []interface{}
}
var calls []methodCall
BeforeEach(func() {
calls = make([]methodCall, 0)
first = CallbacksFuncs{
OnStreamOpenFunc: func(ctx context.Context, streamID int64, typ string) error {
calls = append(calls, methodCall{"1st", "OnStreamOpen()", []interface{}{ctx, streamID, typ}})
return fmt.Errorf("1st: OnStreamOpen()")
},
OnStreamClosedFunc: func(streamID int64, n *envoy_core.Node) {
calls = append(calls, methodCall{"1st", "OnStreamClosed()", []interface{}{streamID, n}})
},
OnStreamRequestFunc: func(streamID int64, req *envoy_sd.DiscoveryRequest) error {
calls = append(calls, methodCall{"1st", "OnStreamRequest()", []interface{}{streamID, req}})
return fmt.Errorf("1st: OnStreamRequest()")
},
OnStreamResponseFunc: func(ctx context.Context, streamID int64, req *envoy_sd.DiscoveryRequest, resp *envoy_sd.DiscoveryResponse) {
calls = append(calls, methodCall{"1st", "OnStreamResponse()", []interface{}{ctx, streamID, req, resp}})
},
}
second = CallbacksFuncs{
OnStreamOpenFunc: func(ctx context.Context, streamID int64, typ string) error {
calls = append(calls, methodCall{"2nd", "OnStreamOpen()", []interface{}{ctx, streamID, typ}})
return fmt.Errorf("2nd: OnStreamOpen()")
},
OnStreamClosedFunc: func(streamID int64, n *envoy_core.Node) {
calls = append(calls, methodCall{"2nd", "OnStreamClosed()", []interface{}{streamID, n}})
},
OnStreamRequestFunc: func(streamID int64, req *envoy_sd.DiscoveryRequest) error {
calls = append(calls, methodCall{"2nd", "OnStreamRequest()", []interface{}{streamID, req}})
return fmt.Errorf("2nd: OnStreamRequest()")
},
OnStreamResponseFunc: func(ctx context.Context, streamID int64, req *envoy_sd.DiscoveryRequest, resp *envoy_sd.DiscoveryResponse) {
calls = append(calls, methodCall{"2nd", "OnStreamResponse()", []interface{}{ctx, streamID, req, resp}})
},
}
})
Describe("OnStreamOpen", func() {
It("should be called sequentially and return after first error", func() {
// given
ctx := context.Background()
streamID := int64(1)
typ := "xDS"
// setup
chain := util_xds_v3.CallbacksChain{first, second}
// when
err := chain.OnStreamOpen(ctx, streamID, typ)
// then
Expect(calls).To(Equal([]methodCall{
{"1st", "OnStreamOpen()", []interface{}{ctx, streamID, typ}},
}))
// and
Expect(err).To(MatchError("1st: OnStreamOpen()"))
})
})
Describe("OnStreamClose", func() {
It("should be called in reverse order", func() {
// given
streamID := int64(1)
n := &envoy_core.Node{Id: "my-node"}
// setup
chain := util_xds_v3.CallbacksChain{first, second}
// when
chain.OnStreamClosed(streamID, n)
// then
Expect(calls).To(Equal([]methodCall{
{"2nd", "OnStreamClosed()", []interface{}{streamID, n}},
{"1st", "OnStreamClosed()", []interface{}{streamID, n}},
}))
})
})
Describe("OnStreamRequest", func() {
It("should be called sequentially and return after first error", func() {
// given
streamID := int64(1)
req := &envoy_sd.DiscoveryRequest{}
// setup
chain := util_xds_v3.CallbacksChain{first, second}
// when
err := chain.OnStreamRequest(streamID, req)
// then
Expect(calls).To(Equal([]methodCall{
{"1st", "OnStreamRequest()", []interface{}{streamID, req}},
}))
// and
Expect(err).To(MatchError("1st: OnStreamRequest()"))
})
})
Describe("OnStreamResponse", func() {
It("should be called in reverse order", func() {
// given
chain := util_xds_v3.CallbacksChain{first, second}
streamID := int64(1)
req := &envoy_sd.DiscoveryRequest{}
resp := &envoy_sd.DiscoveryResponse{}
ctx := context.TODO()
// when
chain.OnStreamResponse(ctx, streamID, req, resp)
// then
Expect(calls).To(Equal([]methodCall{
{"2nd", "OnStreamResponse()", []interface{}{ctx, streamID, req, resp}},
{"1st", "OnStreamResponse()", []interface{}{ctx, streamID, req, resp}},
}))
})
})
})
var _ envoy_xds.Callbacks = CallbacksFuncs{}
type CallbacksFuncs struct {
OnStreamOpenFunc func(context.Context, int64, string) error
OnStreamClosedFunc func(int64, *envoy_core.Node)
OnStreamRequestFunc func(int64, *envoy_sd.DiscoveryRequest) error
OnStreamResponseFunc func(context.Context, int64, *envoy_sd.DiscoveryRequest, *envoy_sd.DiscoveryResponse)
OnFetchRequestFunc func(context.Context, *envoy_sd.DiscoveryRequest) error
OnFetchResponseFunc func(*envoy_sd.DiscoveryRequest, *envoy_sd.DiscoveryResponse)
}
func (f CallbacksFuncs) OnStreamOpen(ctx context.Context, streamID int64, typ string) error {
if f.OnStreamOpenFunc != nil {
return f.OnStreamOpenFunc(ctx, streamID, typ)
}
return nil
}
func (f CallbacksFuncs) OnStreamClosed(streamID int64, n *envoy_core.Node) {
if f.OnStreamClosedFunc != nil {
f.OnStreamClosedFunc(streamID, n)
}
}
func (f CallbacksFuncs) OnStreamRequest(streamID int64, req *envoy_sd.DiscoveryRequest) error {
if f.OnStreamRequestFunc != nil {
return f.OnStreamRequestFunc(streamID, req)
}
return nil
}
func (f CallbacksFuncs) OnStreamResponse(ctx context.Context, streamID int64, req *envoy_sd.DiscoveryRequest, resp *envoy_sd.DiscoveryResponse) {
if f.OnStreamResponseFunc != nil {
f.OnStreamResponseFunc(ctx, streamID, req, resp)
}
}
func (f CallbacksFuncs) OnFetchRequest(ctx context.Context, req *envoy_sd.DiscoveryRequest) error {
if f.OnFetchRequestFunc != nil {
return f.OnFetchRequestFunc(ctx, req)
}
return nil
}
func (f CallbacksFuncs) OnFetchResponse(req *envoy_sd.DiscoveryRequest, resp *envoy_sd.DiscoveryResponse) {
if f.OnFetchResponseFunc != nil {
f.OnFetchResponseFunc(req, resp)
}
}
func (f CallbacksFuncs) OnDeltaStreamOpen(ctx context.Context, i int64, s string) error {
return nil
}
func (f CallbacksFuncs) OnDeltaStreamClosed(i int64, n *envoy_core.Node) {
}
func (f CallbacksFuncs) OnStreamDeltaRequest(i int64, request *envoy_sd.DeltaDiscoveryRequest) error {
return nil
}
func (f CallbacksFuncs) OnStreamDeltaResponse(i int64, request *envoy_sd.DeltaDiscoveryRequest, response *envoy_sd.DeltaDiscoveryResponse) {
}