add Metadata Sync Service
diff --git a/pkg/dubbo/components.go b/pkg/dubbo/components.go
index c61d61f..0fc70f9 100644
--- a/pkg/dubbo/components.go
+++ b/pkg/dubbo/components.go
@@ -63,6 +63,7 @@
 	// register MetadataService
 	metadata := dubbo_metadata.NewMetadataServe(
 		rt.AppContext(),
+		cfg,
 		dubboPusher,
 		rt.ResourceManager(),
 		rt.Transactions(),
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index 00e1835..2b0e0c1 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -19,10 +19,21 @@
 
 import (
 	"context"
+	"io"
 	"time"
 )
 
 import (
+	"github.com/google/uuid"
+
+	"github.com/pkg/errors"
+
+	"google.golang.org/grpc/codes"
+
+	"google.golang.org/grpc/status"
+)
+
+import (
 	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
 	"github.com/apache/dubbo-kubernetes/pkg/config/dubbo"
 	"github.com/apache/dubbo-kubernetes/pkg/core"
@@ -30,6 +41,7 @@
 	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
 	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
 	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+	"github.com/apache/dubbo-kubernetes/pkg/dubbo/client"
 	"github.com/apache/dubbo-kubernetes/pkg/dubbo/pusher"
 )
 
@@ -107,7 +119,101 @@
 }
 
 func (m *MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
-	return nil
+	mesh := core_model.DefaultMesh // todo: mesh
+	errChan := make(chan error)
+
+	clientID := uuid.NewString()
+	metadataSyncStream := client.NewDubboSyncStream(stream)
+	// DubboSyncClient is to handle MetaSyncRequest from data plane
+	metadataSyncClient := client.NewDubboSyncClient(
+		log.WithName("client"),
+		clientID,
+		metadataSyncStream,
+		&client.Callbacks{
+			OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
+				// when received request, invoke callback
+				m.pusher.InvokeCallback(core_mesh.MetaDataType, clientID)
+				return nil
+			},
+		})
+	go func() {
+		// Handle requests from client
+		err := metadataSyncClient.HandleReceive()
+		if errors.Is(err, io.EOF) {
+			log.Info("DubboSyncClient finished gracefully")
+			errChan <- nil
+			return
+		}
+
+		log.Error(err, "DubboSyncClient finished with an error")
+		errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
+	}()
+
+	m.pusher.AddCallback(
+		core_mesh.MetaDataType,
+		metadataSyncClient.ClientID(),
+		func(items pusher.PushedItems) {
+			resourceList := items.ResourceList()
+			revision := items.Revision()
+			metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
+			if !ok {
+				return
+			}
+
+			err := metadataSyncClient.Send(metadataList, revision)
+			if err != nil {
+				if errors.Is(err, io.EOF) {
+					log.Info("DubboSyncClient finished gracefully")
+					errChan <- nil
+					return
+				}
+
+				log.Error(err, "send mapping sync response failed", "metadataList", metadataList, "revision", revision)
+				errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
+			}
+		},
+		func(resourceList core_model.ResourceList) core_model.ResourceList {
+			if resourceList.GetItemType() != core_mesh.MetaDataType {
+				return nil
+			}
+
+			// only send Metadata which client subscribed
+			newResourceList := &core_mesh.MeshResourceList{}
+			for _, resource := range resourceList.GetItems() {
+				expected := false
+				metaData := resource.(*core_mesh.MetaDataResource)
+				for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
+					if applicationName == metaData.Spec.GetApp() && mesh == resource.GetMeta().GetMesh() {
+						expected = true
+						break
+					}
+				}
+
+				if expected {
+					// find
+					_ = newResourceList.AddItem(resource)
+				}
+			}
+
+			return newResourceList
+		},
+	)
+
+	// in the end, remove callback of this client
+	defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())
+
+	for {
+		select {
+		case err := <-errChan:
+			if err == nil {
+				log.Info("MetadataSync finished gracefully")
+				return nil
+			}
+
+			log.Error(err, "MetadataSync finished with an error")
+			return status.Error(codes.Internal, err.Error())
+		}
+	}
 }
 
 func (m *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {