refactor sync stream and client
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index 96d4117..a5a212a 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -25,78 +25,104 @@
"github.com/google/uuid"
"github.com/pkg/errors"
+
+ "google.golang.org/grpc"
+
+ "google.golang.org/protobuf/proto"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
-var _ MappingSyncStream = &stream{}
+var _ DubboSyncStream = &stream{}
type stream struct {
- streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer
+ streamClient grpc.ServerStream
- // subscribedInterfaceNames records request's interfaceName in Mapping Request from data plane.
+ // subscribedInterfaceNames records request's interfaceName in MappingSync Request from data plane.
subscribedInterfaceNames map[string]struct{}
- lastNonce string
- mu sync.RWMutex
+ // subscribedApplicationNames records request's applicationName in MetaDataSync Request from data plane.
+ subscribedApplicationNames map[string]struct{}
+
+ mappingLastNonce string
+ metadataLastNonce string
+ mu sync.RWMutex
}
-func NewMappingSyncStream(streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer) MappingSyncStream {
+func NewDubboSyncStream(streamClient grpc.ServerStream) DubboSyncStream {
return &stream{
streamClient: streamClient,
- subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedApplicationNames: make(map[string]struct{}),
}
}
-type MappingSyncStream interface {
- Recv() (*mesh_proto.MappingSyncRequest, error)
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+type DubboSyncStream interface {
+ Recv() (proto.Message, error)
+ Send(resourceList core_model.ResourceList, revision int64) error
SubscribedInterfaceNames() []string
}
-func (s *stream) Recv() (*mesh_proto.MappingSyncRequest, error) {
- request, err := s.streamClient.Recv()
- if err != nil {
- return nil, err
+func (s *stream) Recv() (proto.Message, error) {
+ switch s.streamClient.(type) {
+ case mesh_proto.ServiceNameMappingService_MappingSyncServer:
+ request := &mesh_proto.MappingSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.mappingLastNonce != "" && s.mappingLastNonce != request.GetNonce() {
+ return nil, errors.New("request's nonce is different to last nonce")
+ }
+
+ // subscribe Mapping
+ s.mu.Lock()
+ interfaceName := request.GetInterfaceName()
+ s.subscribedInterfaceNames[interfaceName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
+ case mesh_proto.MetadataService_MetadataSyncServer:
+ return nil, nil
+ default:
+ return nil, errors.New("unknown type request")
}
-
- if s.lastNonce != "" && s.lastNonce != request.GetNonce() {
- return nil, errors.New("request's nonce is different to last nonce")
- }
-
- // subscribe Mapping
- s.mu.Lock()
- interfaceName := request.GetInterfaceName()
- s.subscribedInterfaceNames[interfaceName] = struct{}{}
- s.mu.Lock()
-
- return request, nil
}
-func (s *stream) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
+func (s *stream) Send(resourceList core_model.ResourceList, revision int64) error {
s.mu.RLock()
defer s.mu.RUnlock()
nonce := uuid.NewString()
- mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
- for _, item := range mappingList.Items {
- mappings = append(mappings, &mesh_proto.Mapping{
- Zone: item.Spec.Zone,
- InterfaceName: item.Spec.InterfaceName,
- ApplicationNames: item.Spec.ApplicationNames,
- })
- }
- s.lastNonce = nonce
- response := &mesh_proto.MappingSyncResponse{
- Nonce: nonce,
- Revision: revision,
- Mappings: mappings,
+ switch resourceList.(type) {
+ case *core_mesh.MappingResourceList:
+ mappingList := resourceList.(*core_mesh.MappingResourceList)
+ mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
+ for _, item := range mappingList.Items {
+ mappings = append(mappings, &mesh_proto.Mapping{
+ Zone: item.Spec.Zone,
+ InterfaceName: item.Spec.InterfaceName,
+ ApplicationNames: item.Spec.ApplicationNames,
+ })
+ }
+
+ s.mappingLastNonce = nonce
+ response := &mesh_proto.MappingSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ Mappings: mappings,
+ }
+ return s.streamClient.SendMsg(response)
+ case *core_mesh.MetaDataResourceList:
+ return nil
+ default:
+ return errors.New("unknown type request")
}
- return s.streamClient.Send(response)
}
func (s *stream) SubscribedInterfaceNames() []string {
diff --git a/pkg/dubbo/client/sync_client.go b/pkg/dubbo/client/sync_client.go
index 1641912..bad8d72 100644
--- a/pkg/dubbo/client/sync_client.go
+++ b/pkg/dubbo/client/sync_client.go
@@ -29,29 +29,30 @@
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
type Callbacks struct {
- OnRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMappingSyncRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMetadataSyncRequestReceived func(request *mesh_proto.MetadataSyncRequest) error
}
-// MappingSyncClient Handle MappingSyncRequest from client
-type MappingSyncClient interface {
+// DubboSyncClient Handle Dubbo Sync Request from client
+type DubboSyncClient interface {
ClientID() string
HandleReceive() error
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+ Send(resourceList core_model.ResourceList, revision int64) error
}
-type mappingSyncClient struct {
+type dubboSyncClient struct {
log logr.Logger
id string
- syncStream MappingSyncStream
+ syncStream DubboSyncStream
callbacks *Callbacks
}
-func NewMappingSyncClient(log logr.Logger, id string, syncStream MappingSyncStream, cb *Callbacks) MappingSyncClient {
- return &mappingSyncClient{
+func NewDubboSyncClient(log logr.Logger, id string, syncStream DubboSyncStream, cb *Callbacks) DubboSyncClient {
+ return &dubboSyncClient{
log: log,
id: id,
syncStream: syncStream,
@@ -59,11 +60,11 @@
}
}
-func (s *mappingSyncClient) ClientID() string {
+func (s *dubboSyncClient) ClientID() string {
return s.id
}
-func (s *mappingSyncClient) HandleReceive() error {
+func (s *dubboSyncClient) HandleReceive() error {
for {
received, err := s.syncStream.Recv()
if err != nil {
@@ -80,16 +81,22 @@
}
// callbacks
- err = s.callbacks.OnRequestReceived(received)
- if err != nil {
- s.log.Error(err, "error in OnRequestReceived")
- } else {
- s.log.Info("OnRequestReceived successed")
+ switch received.(type) {
+ case *mesh_proto.MappingSyncRequest:
+ err = s.callbacks.OnMappingSyncRequestReceived(received.(*mesh_proto.MappingSyncRequest))
+ if err != nil {
+ s.log.Error(err, "error in OnMappingSyncRequestReceived")
+ } else {
+ s.log.Info("OnMappingSyncRequestReceived successed")
+ }
+ case *mesh_proto.MetadataSyncRequest:
+ panic("unimplemented")
+ default:
+ return errors.New("unknown type request")
}
-
}
}
-func (s *mappingSyncClient) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
- return s.syncStream.Send(mappingList, revision)
+func (s *dubboSyncClient) Send(resourceList core_model.ResourceList, revision int64) error {
+ return s.syncStream.Send(resourceList, revision)
}
diff --git a/pkg/dubbo/servicemapping/server.go b/pkg/dubbo/servicemapping/server.go
index addd354..f983db2 100644
--- a/pkg/dubbo/servicemapping/server.go
+++ b/pkg/dubbo/servicemapping/server.go
@@ -126,14 +126,14 @@
errChan := make(chan error)
clientID := uuid.NewString()
- mappingSyncStream := client.NewMappingSyncStream(stream)
- // MappingSyncClient is to handle MappingSyncRequest from data plane
- mappingSyncClient := client.NewMappingSyncClient(
+ mappingSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MappingSyncRequest from data plane
+ mappingSyncClient := client.NewDubboSyncClient(
log.WithName("client"),
clientID,
mappingSyncStream,
&client.Callbacks{
- OnRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
+ OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
// when received request, invoke callback
s.pusher.InvokeCallback(core_mesh.MappingType, clientID)
return nil
@@ -143,13 +143,13 @@
// Handle requests from client
err := mappingSyncClient.HandleReceive()
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
- log.Error(err, "MappingSyncClient finished with an error")
- errChan <- errors.Wrap(err, "MappingSyncClient finished with an error")
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
}()
s.pusher.AddCallback(
@@ -166,13 +166,13 @@
err := mappingSyncClient.Send(mappingList, revision)
if err != nil {
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
- errChan <- errors.Wrap(err, "MappingSyncClient send with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
}
},
func(resourceList core_model.ResourceList) core_model.ResourceList {