blob: 8d75746122e6a4a3b8a85d08b576c86df111a13c [file] [log] [blame]
package stream
import (
"context"
"crypto/tls"
url2 "net/url"
)
import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
)
type Client struct {
conn *grpc.ClientConn
client mesh_proto.MDSSyncServiceClient
}
type mappingStream struct {
mappingStream mesh_proto.MDSSyncService_MappingSyncClient
latestACKed *mesh_proto.MappingSyncResponse
latestReceived *mesh_proto.MappingSyncResponse
}
type metadataStream struct {
metadataStream mesh_proto.MDSSyncService_MetadataSyncClient
latestACKed *mesh_proto.MetadataSyncResponse
latestReceived *mesh_proto.MetadataSyncResponse
}
func New(serverURL string) (*Client, error) {
url, err := url2.Parse(serverURL)
if err != nil {
return nil, err
}
var dialOpts []grpc.DialOption
switch url.Scheme {
case "grpc":
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
case "grpcs":
// #nosec G402 -- it's acceptable as this is only to be used in testing
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})))
default:
return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})
}
conn, err := grpc.Dial(url.Host, dialOpts...)
if err != nil {
return nil, err
}
client := mesh_proto.NewMDSSyncServiceClient(conn)
return &Client{
conn: conn,
client: client,
}, nil
}
func (c *Client) MappingRegister(ctx context.Context, req *mesh_proto.MappingRegisterRequest) error {
_, err := c.client.MappingRegister(ctx, req)
if err != nil {
return err
}
return nil
}
func (c *Client) MetadataRegister(ctx context.Context, req *mesh_proto.MetaDataRegisterRequest) error {
_, err := c.client.MetadataRegister(ctx, req)
if err != nil {
return err
}
return nil
}
func (c *Client) StartMappingStream() (*mappingStream, error) {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{})
stream, err := c.client.MappingSync(ctx)
if err != nil {
return nil, err
}
return &mappingStream{
mappingStream: stream,
latestACKed: &mesh_proto.MappingSyncResponse{},
latestReceived: &mesh_proto.MappingSyncResponse{},
}, nil
}
func (c *Client) StartMetadataSteam() (*metadataStream, error) {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{})
stream, err := c.client.MetadataSync(ctx)
if err != nil {
return nil, err
}
return &metadataStream{
metadataStream: stream,
latestACKed: &mesh_proto.MetadataSyncResponse{},
latestReceived: &mesh_proto.MetadataSyncResponse{},
}, nil
}
func (c *Client) Close() error {
return c.conn.Close()
}
func (s *metadataStream) MetadataSyncRequest(req *mesh_proto.MetadataSyncRequest) error {
err := s.metadataStream.Send(req)
if err != nil {
return err
}
return nil
}
func (s *metadataStream) WaitForMetadataResource() (*mesh_proto.MetadataSyncResponse, error) {
resp, err := s.metadataStream.Recv()
if err != nil {
return nil, err
}
s.latestReceived = resp
return resp, err
}
func (s *mappingStream) MappingSyncRequest(req *mesh_proto.MappingSyncRequest) error {
err := s.mappingStream.Send(req)
if err != nil {
return err
}
return nil
}
func (s *mappingStream) WaitForMappingResource() (*mesh_proto.MappingSyncResponse, error) {
resp, err := s.mappingStream.Recv()
if err != nil {
return nil, err
}
s.latestReceived = resp
return resp, err
}
func (s *mappingStream) Close() error {
return s.mappingStream.CloseSend()
}
func (s *metadataStream) Close() error {
return s.metadataStream.CloseSend()
}
func (s *mappingStream) MappingACK() error {
latestReceived := s.latestReceived
if latestReceived == nil {
return nil
}
err := s.mappingStream.Send(&mesh_proto.MappingSyncRequest{
Nonce: latestReceived.Nonce,
})
if err == nil {
s.latestACKed = s.latestReceived
}
return err
}
func (s *metadataStream) MetadataACK() error {
latestReceived := s.latestReceived
if latestReceived == nil {
return nil
}
err := s.metadataStream.Send(&mesh_proto.MetadataSyncRequest{
Nonce: latestReceived.Nonce,
})
if err == nil {
s.latestACKed = s.latestReceived
}
return err
}