add Metadata Sync Service
diff --git a/pkg/dubbo/components.go b/pkg/dubbo/components.go
index c61d61f..0fc70f9 100644
--- a/pkg/dubbo/components.go
+++ b/pkg/dubbo/components.go
@@ -63,6 +63,7 @@
// register MetadataService
metadata := dubbo_metadata.NewMetadataServe(
rt.AppContext(),
+ cfg,
dubboPusher,
rt.ResourceManager(),
rt.Transactions(),
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index 00e1835..2b0e0c1 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -19,10 +19,21 @@
import (
"context"
+ "io"
"time"
)
import (
+ "github.com/google/uuid"
+
+ "github.com/pkg/errors"
+
+ "google.golang.org/grpc/codes"
+
+ "google.golang.org/grpc/status"
+)
+
+import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/config/dubbo"
"github.com/apache/dubbo-kubernetes/pkg/core"
@@ -30,6 +41,7 @@
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+ "github.com/apache/dubbo-kubernetes/pkg/dubbo/client"
"github.com/apache/dubbo-kubernetes/pkg/dubbo/pusher"
)
@@ -107,7 +119,101 @@
}
func (m *MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
- return nil
+ mesh := core_model.DefaultMesh // todo: mesh
+ errChan := make(chan error)
+
+ clientID := uuid.NewString()
+ metadataSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MetaSyncRequest from data plane
+ metadataSyncClient := client.NewDubboSyncClient(
+ log.WithName("client"),
+ clientID,
+ metadataSyncStream,
+ &client.Callbacks{
+ OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
+ // when received request, invoke callback
+ m.pusher.InvokeCallback(core_mesh.MetaDataType, clientID)
+ return nil
+ },
+ })
+ go func() {
+ // Handle requests from client
+ err := metadataSyncClient.HandleReceive()
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
+ }()
+
+ m.pusher.AddCallback(
+ core_mesh.MetaDataType,
+ metadataSyncClient.ClientID(),
+ func(items pusher.PushedItems) {
+ resourceList := items.ResourceList()
+ revision := items.Revision()
+ metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
+ if !ok {
+ return
+ }
+
+ err := metadataSyncClient.Send(metadataList, revision)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "send mapping sync response failed", "metadataList", metadataList, "revision", revision)
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
+ }
+ },
+ func(resourceList core_model.ResourceList) core_model.ResourceList {
+ if resourceList.GetItemType() != core_mesh.MetaDataType {
+ return nil
+ }
+
+ // only send Metadata which client subscribed
+ newResourceList := &core_mesh.MeshResourceList{}
+ for _, resource := range resourceList.GetItems() {
+ expected := false
+ metaData := resource.(*core_mesh.MetaDataResource)
+ for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
+ if applicationName == metaData.Spec.GetApp() && mesh == resource.GetMeta().GetMesh() {
+ expected = true
+ break
+ }
+ }
+
+ if expected {
+ // find
+ _ = newResourceList.AddItem(resource)
+ }
+ }
+
+ return newResourceList
+ },
+ )
+
+ // in the end, remove callback of this client
+ defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())
+
+ for {
+ select {
+ case err := <-errChan:
+ if err == nil {
+ log.Info("MetadataSync finished gracefully")
+ return nil
+ }
+
+ log.Error(err, "MetadataSync finished with an error")
+ return status.Error(codes.Internal, err.Error())
+ }
+ }
}
func (m *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {