blob: 550de44c7cbc62fb413e59397ab540bab3658a6d [file] [log] [blame]
package stream
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/url"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/pkg/errors"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/structpb"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest"
util_proto "github.com/apache/dubbo-kubernetes/pkg/util/proto"
)
type Client struct {
conn *grpc.ClientConn
client envoy_discovery.AggregatedDiscoveryServiceClient
}
type Stream struct {
stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
latestACKed map[string]*envoy_discovery.DiscoveryResponse
latestReceived map[string]*envoy_discovery.DiscoveryResponse
}
func New(serverURL string) (*Client, error) {
url, err := url.Parse(serverURL)
if err != nil {
return nil, err
}
var dialOpts []grpc.DialOption
switch url.Scheme {
case "grpc":
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
case "grpcs":
// #nosec G402 -- it's acceptable as this is only to be used in testing
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})))
default:
return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})
}
conn, err := grpc.Dial(url.Host, dialOpts...)
if err != nil {
return nil, err
}
client := envoy_discovery.NewAggregatedDiscoveryServiceClient(conn)
return &Client{
conn: conn,
client: client,
}, nil
}
func (c *Client) StartStream() (*Stream, error) {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{})
stream, err := c.client.StreamAggregatedResources(ctx)
if err != nil {
return nil, err
}
return &Stream{
stream: stream,
latestACKed: make(map[string]*envoy_discovery.DiscoveryResponse),
latestReceived: make(map[string]*envoy_discovery.DiscoveryResponse),
}, nil
}
func (c *Client) Close() error {
return c.conn.Close()
}
func (s *Stream) Request(clientId string, typ string, dp rest.Resource) error {
dpJSON, err := json.Marshal(dp)
if err != nil {
return err
}
version := &mesh_proto.Version{}
md := &structpb.Struct{
Fields: map[string]*structpb.Value{
"dataplane.resource": {Kind: &structpb.Value_StringValue{StringValue: string(dpJSON)}},
"version": {
Kind: &structpb.Value_StructValue{
StructValue: util_proto.MustToStruct(version),
},
},
},
}
return s.stream.Send(&envoy_discovery.DiscoveryRequest{
VersionInfo: "",
ResponseNonce: "",
Node: &envoy_core.Node{
Id: clientId,
Metadata: md,
},
ResourceNames: []string{},
TypeUrl: typ,
})
}
func (s *Stream) ACK(typ string) error {
latestReceived := s.latestReceived[typ]
if latestReceived == nil {
return nil
}
err := s.stream.Send(&envoy_discovery.DiscoveryRequest{
VersionInfo: latestReceived.VersionInfo,
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
TypeUrl: typ,
})
if err == nil {
s.latestACKed = s.latestReceived
}
return err
}
func (s *Stream) NACK(typ string, err error) error {
latestReceived := s.latestReceived[typ]
if latestReceived == nil {
return nil
}
latestACKed := s.latestACKed[typ]
return s.stream.Send(&envoy_discovery.DiscoveryRequest{
VersionInfo: latestACKed.GetVersionInfo(),
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
TypeUrl: typ,
ErrorDetail: &status.Status{
Message: fmt.Sprintf("%s", err),
},
})
}
func (s *Stream) WaitForResources() (*envoy_discovery.DiscoveryResponse, error) {
resp, err := s.stream.Recv()
if err != nil {
return nil, err
}
s.latestReceived[resp.TypeUrl] = resp
return resp, nil
}
func (s *Stream) Close() error {
return s.stream.CloseSend()
}