update stream for Metadata Sync
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index a5a212a..f200b47 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -65,6 +65,7 @@
Recv() (proto.Message, error)
Send(resourceList core_model.ResourceList, revision int64) error
SubscribedInterfaceNames() []string
+ SubscribedApplicationNames() []string
}
func (s *stream) Recv() (proto.Message, error) {
@@ -76,7 +77,7 @@
return nil, err
}
if s.mappingLastNonce != "" && s.mappingLastNonce != request.GetNonce() {
- return nil, errors.New("request's nonce is different to last nonce")
+ return nil, errors.New("mapping sync request's nonce is different to last nonce")
}
// subscribe Mapping
@@ -87,7 +88,22 @@
return request, nil
case mesh_proto.MetadataService_MetadataSyncServer:
- return nil, nil
+ request := &mesh_proto.MetadataSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.metadataLastNonce != "" && s.metadataLastNonce != request.GetNonce() {
+ return nil, errors.New("metadata sync request's nonce is different to last nonce")
+ }
+
+ // subscribe MetaData
+ s.mu.Lock()
+ appName := request.GetApplicationName()
+ s.subscribedApplicationNames[appName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
default:
return nil, errors.New("unknown type request")
}
@@ -119,7 +135,23 @@
}
return s.streamClient.SendMsg(response)
case *core_mesh.MetaDataResourceList:
- return nil
+ metadataList := resourceList.(*core_mesh.MetaDataResourceList)
+ metaDatum := make([]*mesh_proto.MetaData, 0, len(metadataList.Items))
+ for _, item := range metadataList.Items {
+ metaDatum = append(metaDatum, &mesh_proto.MetaData{
+ App: item.Spec.GetApp(),
+ Revision: item.Spec.Revision,
+ Services: item.Spec.GetServices(),
+ })
+ }
+
+ s.metadataLastNonce = nonce
+ response := &mesh_proto.MetadataSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ MetaDatum: metaDatum,
+ }
+ return s.streamClient.SendMsg(response)
default:
return errors.New("unknown type request")
}
@@ -136,3 +168,14 @@
return result
}
+func (s *stream) SubscribedApplicationNames() []string {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ result := make([]string, 0, len(s.subscribedApplicationNames))
+ for appName := range s.subscribedApplicationNames {
+ result = append(result, appName)
+ }
+
+ return result
+}