refactor sync stream and client
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index 96d4117..a5a212a 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -25,78 +25,104 @@
 	"github.com/google/uuid"
 
 	"github.com/pkg/errors"
+
+	"google.golang.org/grpc"
+
+	"google.golang.org/protobuf/proto"
 )
 
 import (
 	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
 	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
 )
 
-var _ MappingSyncStream = &stream{}
+var _ DubboSyncStream = &stream{}
 
 type stream struct {
-	streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer
+	streamClient grpc.ServerStream
 
-	// subscribedInterfaceNames records request's interfaceName in Mapping Request from data plane.
+	// subscribedInterfaceNames records request's interfaceName in MappingSync Request from data plane.
 	subscribedInterfaceNames map[string]struct{}
-	lastNonce                string
-	mu                       sync.RWMutex
+	// subscribedApplicationNames records request's applicationName in MetaDataSync Request from data plane.
+	subscribedApplicationNames map[string]struct{}
+
+	mappingLastNonce  string
+	metadataLastNonce string
+	mu                sync.RWMutex
 }
 
-func NewMappingSyncStream(streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer) MappingSyncStream {
+func NewDubboSyncStream(streamClient grpc.ServerStream) DubboSyncStream {
 	return &stream{
 		streamClient: streamClient,
 
-		subscribedInterfaceNames: make(map[string]struct{}),
+		subscribedInterfaceNames:   make(map[string]struct{}),
+		subscribedApplicationNames: make(map[string]struct{}),
 	}
 }
 
-type MappingSyncStream interface {
-	Recv() (*mesh_proto.MappingSyncRequest, error)
-	Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+type DubboSyncStream interface {
+	Recv() (proto.Message, error)
+	Send(resourceList core_model.ResourceList, revision int64) error
 	SubscribedInterfaceNames() []string
 }
 
-func (s *stream) Recv() (*mesh_proto.MappingSyncRequest, error) {
-	request, err := s.streamClient.Recv()
-	if err != nil {
-		return nil, err
+func (s *stream) Recv() (proto.Message, error) {
+	switch s.streamClient.(type) {
+	case mesh_proto.ServiceNameMappingService_MappingSyncServer:
+		request := &mesh_proto.MappingSyncRequest{}
+		err := s.streamClient.RecvMsg(request)
+		if err != nil {
+			return nil, err
+		}
+		if s.mappingLastNonce != "" && s.mappingLastNonce != request.GetNonce() {
+			return nil, errors.New("request's nonce is different to last nonce")
+		}
+
+		// subscribe Mapping
+		s.mu.Lock()
+		interfaceName := request.GetInterfaceName()
+		s.subscribedInterfaceNames[interfaceName] = struct{}{}
+		s.mu.Lock()
+
+		return request, nil
+	case mesh_proto.MetadataService_MetadataSyncServer:
+		return nil, nil
+	default:
+		return nil, errors.New("unknown type request")
 	}
-
-	if s.lastNonce != "" && s.lastNonce != request.GetNonce() {
-		return nil, errors.New("request's nonce is different to last nonce")
-	}
-
-	// subscribe Mapping
-	s.mu.Lock()
-	interfaceName := request.GetInterfaceName()
-	s.subscribedInterfaceNames[interfaceName] = struct{}{}
-	s.mu.Lock()
-
-	return request, nil
 }
 
-func (s *stream) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
+func (s *stream) Send(resourceList core_model.ResourceList, revision int64) error {
 	s.mu.RLock()
 	defer s.mu.RUnlock()
 
 	nonce := uuid.NewString()
-	mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
-	for _, item := range mappingList.Items {
-		mappings = append(mappings, &mesh_proto.Mapping{
-			Zone:             item.Spec.Zone,
-			InterfaceName:    item.Spec.InterfaceName,
-			ApplicationNames: item.Spec.ApplicationNames,
-		})
-	}
 
-	s.lastNonce = nonce
-	response := &mesh_proto.MappingSyncResponse{
-		Nonce:    nonce,
-		Revision: revision,
-		Mappings: mappings,
+	switch resourceList.(type) {
+	case *core_mesh.MappingResourceList:
+		mappingList := resourceList.(*core_mesh.MappingResourceList)
+		mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
+		for _, item := range mappingList.Items {
+			mappings = append(mappings, &mesh_proto.Mapping{
+				Zone:             item.Spec.Zone,
+				InterfaceName:    item.Spec.InterfaceName,
+				ApplicationNames: item.Spec.ApplicationNames,
+			})
+		}
+
+		s.mappingLastNonce = nonce
+		response := &mesh_proto.MappingSyncResponse{
+			Nonce:    nonce,
+			Revision: revision,
+			Mappings: mappings,
+		}
+		return s.streamClient.SendMsg(response)
+	case *core_mesh.MetaDataResourceList:
+		return nil
+	default:
+		return errors.New("unknown type request")
 	}
-	return s.streamClient.Send(response)
 }
 
 func (s *stream) SubscribedInterfaceNames() []string {
diff --git a/pkg/dubbo/client/sync_client.go b/pkg/dubbo/client/sync_client.go
index 1641912..bad8d72 100644
--- a/pkg/dubbo/client/sync_client.go
+++ b/pkg/dubbo/client/sync_client.go
@@ -29,29 +29,30 @@
 
 import (
 	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
-	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
 )
 
 type Callbacks struct {
-	OnRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+	OnMappingSyncRequestReceived  func(request *mesh_proto.MappingSyncRequest) error
+	OnMetadataSyncRequestReceived func(request *mesh_proto.MetadataSyncRequest) error
 }
 
-// MappingSyncClient Handle MappingSyncRequest from client
-type MappingSyncClient interface {
+// DubboSyncClient Handle Dubbo Sync Request from client
+type DubboSyncClient interface {
 	ClientID() string
 	HandleReceive() error
-	Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+	Send(resourceList core_model.ResourceList, revision int64) error
 }
 
-type mappingSyncClient struct {
+type dubboSyncClient struct {
 	log        logr.Logger
 	id         string
-	syncStream MappingSyncStream
+	syncStream DubboSyncStream
 	callbacks  *Callbacks
 }
 
-func NewMappingSyncClient(log logr.Logger, id string, syncStream MappingSyncStream, cb *Callbacks) MappingSyncClient {
-	return &mappingSyncClient{
+func NewDubboSyncClient(log logr.Logger, id string, syncStream DubboSyncStream, cb *Callbacks) DubboSyncClient {
+	return &dubboSyncClient{
 		log:        log,
 		id:         id,
 		syncStream: syncStream,
@@ -59,11 +60,11 @@
 	}
 }
 
-func (s *mappingSyncClient) ClientID() string {
+func (s *dubboSyncClient) ClientID() string {
 	return s.id
 }
 
-func (s *mappingSyncClient) HandleReceive() error {
+func (s *dubboSyncClient) HandleReceive() error {
 	for {
 		received, err := s.syncStream.Recv()
 		if err != nil {
@@ -80,16 +81,22 @@
 		}
 
 		// callbacks
-		err = s.callbacks.OnRequestReceived(received)
-		if err != nil {
-			s.log.Error(err, "error in OnRequestReceived")
-		} else {
-			s.log.Info("OnRequestReceived successed")
+		switch received.(type) {
+		case *mesh_proto.MappingSyncRequest:
+			err = s.callbacks.OnMappingSyncRequestReceived(received.(*mesh_proto.MappingSyncRequest))
+			if err != nil {
+				s.log.Error(err, "error in OnMappingSyncRequestReceived")
+			} else {
+				s.log.Info("OnMappingSyncRequestReceived successed")
+			}
+		case *mesh_proto.MetadataSyncRequest:
+			panic("unimplemented")
+		default:
+			return errors.New("unknown type request")
 		}
-
 	}
 }
 
-func (s *mappingSyncClient) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
-	return s.syncStream.Send(mappingList, revision)
+func (s *dubboSyncClient) Send(resourceList core_model.ResourceList, revision int64) error {
+	return s.syncStream.Send(resourceList, revision)
 }
diff --git a/pkg/dubbo/servicemapping/server.go b/pkg/dubbo/servicemapping/server.go
index addd354..f983db2 100644
--- a/pkg/dubbo/servicemapping/server.go
+++ b/pkg/dubbo/servicemapping/server.go
@@ -126,14 +126,14 @@
 	errChan := make(chan error)
 
 	clientID := uuid.NewString()
-	mappingSyncStream := client.NewMappingSyncStream(stream)
-	// MappingSyncClient is to handle MappingSyncRequest from data plane
-	mappingSyncClient := client.NewMappingSyncClient(
+	mappingSyncStream := client.NewDubboSyncStream(stream)
+	// DubboSyncClient is to handle MappingSyncRequest from data plane
+	mappingSyncClient := client.NewDubboSyncClient(
 		log.WithName("client"),
 		clientID,
 		mappingSyncStream,
 		&client.Callbacks{
-			OnRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
+			OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
 				// when received request, invoke callback
 				s.pusher.InvokeCallback(core_mesh.MappingType, clientID)
 				return nil
@@ -143,13 +143,13 @@
 		// Handle requests from client
 		err := mappingSyncClient.HandleReceive()
 		if errors.Is(err, io.EOF) {
-			log.Info("MappingSyncClient finished gracefully")
+			log.Info("DubboSyncClient finished gracefully")
 			errChan <- nil
 			return
 		}
 
-		log.Error(err, "MappingSyncClient finished with an error")
-		errChan <- errors.Wrap(err, "MappingSyncClient finished with an error")
+		log.Error(err, "DubboSyncClient finished with an error")
+		errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
 	}()
 
 	s.pusher.AddCallback(
@@ -166,13 +166,13 @@
 			err := mappingSyncClient.Send(mappingList, revision)
 			if err != nil {
 				if errors.Is(err, io.EOF) {
-					log.Info("MappingSyncClient finished gracefully")
+					log.Info("DubboSyncClient finished gracefully")
 					errChan <- nil
 					return
 				}
 
 				log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
-				errChan <- errors.Wrap(err, "MappingSyncClient send with an error")
+				errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
 			}
 		},
 		func(resourceList core_model.ResourceList) core_model.ResourceList {