dataplane revision
diff --git a/api/mesh/v1alpha1/traffic_helper.go b/api/mesh/v1alpha1/traffic_helper.go
index 5454afa..2a6a238 100644
--- a/api/mesh/v1alpha1/traffic_helper.go
+++ b/api/mesh/v1alpha1/traffic_helper.go
@@ -31,12 +31,12 @@
// Application 流量管控相关的基础label
const (
- Application = "application"
- Service = "service"
- ID = "id"
- ServiceVersion = "serviceVersion"
- ServiceGroup = "serviceGroup"
- Revision = "revision"
+ Application = "dubbo.io/application"
+ Service = "dubbo.io/service"
+ ID = "dubbo.io/id"
+ ServiceVersion = "dubbo.io/serviceVersion"
+ ServiceGroup = "dubbo.io/serviceGroup"
+ Revision = "dubbo.io/revision"
)
type Base struct {
diff --git a/pkg/core/registry/notify.go b/pkg/core/registry/notify.go
index 184c264..c3bc49b 100644
--- a/pkg/core/registry/notify.go
+++ b/pkg/core/registry/notify.go
@@ -125,6 +125,7 @@
dataplaneResource.Spec.Networking = &mesh_proto.Dataplane_Networking{}
dataplaneResource.Spec.Extensions = map[string]string{}
dataplaneResource.Spec.Extensions[mesh_proto.ApplicationName] = app
+ dataplaneResource.Spec.Extensions[mesh_proto.Revision] = revision
dataplaneResource.SetMeta(&resourceMetaObject{
Name: key,
})
diff --git a/pkg/core/resources/model/resource.go b/pkg/core/resources/model/resource.go
index 01b425c..0e7358b 100644
--- a/pkg/core/resources/model/resource.go
+++ b/pkg/core/resources/model/resource.go
@@ -60,6 +60,13 @@
Name string
}
+type ResourceReq struct {
+ Mesh string
+ Name string
+ PodName string
+ Namespace string
+}
+
type ResourceScope string
const (
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index f200b47..a54840c 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -168,6 +168,7 @@
return result
}
+
func (s *stream) SubscribedApplicationNames() []string {
s.mu.RLock()
defer s.mu.RUnlock()
diff --git a/pkg/dubbo/client/sync_client.go b/pkg/dubbo/client/sync_client.go
index bad8d72..702545a 100644
--- a/pkg/dubbo/client/sync_client.go
+++ b/pkg/dubbo/client/sync_client.go
@@ -90,7 +90,12 @@
s.log.Info("OnMappingSyncRequestReceived successed")
}
case *mesh_proto.MetadataSyncRequest:
- panic("unimplemented")
+ err = s.callbacks.OnMetadataSyncRequestReceived(received.(*mesh_proto.MetadataSyncRequest))
+ if err != nil {
+ s.log.Error(err, "error in OnMetadataSyncRequestReceived")
+ } else {
+ s.log.Info("OnMetadataSyncRequestReceived successed")
+ }
default:
return errors.New("unknown type request")
}
diff --git a/pkg/dubbo/metadata/register_request.go b/pkg/dubbo/metadata/register_request.go
index cbb73ad..3119159 100644
--- a/pkg/dubbo/metadata/register_request.go
+++ b/pkg/dubbo/metadata/register_request.go
@@ -27,7 +27,7 @@
)
type RegisterRequest struct {
- ConfigsUpdated map[core_model.ResourceKey]*mesh_proto.MetaData
+ ConfigsUpdated map[core_model.ResourceReq]*mesh_proto.MetaData
}
func (q *RegisterRequest) merge(req *RegisterRequest) *RegisterRequest {
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index 81e6f48..46d617f 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -19,6 +19,7 @@
import (
"context"
+ "github.com/apache/dubbo-kubernetes/pkg/util/rmkey"
"io"
"strings"
"time"
@@ -94,6 +95,7 @@
mesh := core_model.DefaultMesh // todo: mesh
podName := req.GetPodName()
metadata := req.GetMetadata()
+ namespace := req.GetNamespace()
if metadata == nil {
return &mesh_proto.MetaDataRegisterResponse{
Success: false,
@@ -101,12 +103,13 @@
}, nil
}
- // MetaData name = podName.revision
- name := podName + "-" + metadata.Revision
- registerReq := &RegisterRequest{ConfigsUpdated: map[core_model.ResourceKey]*mesh_proto.MetaData{}}
- key := core_model.ResourceKey{
- Mesh: mesh,
- Name: name,
+ name := rmkey.GenerateMetadataResourceKey(metadata.App, metadata.Revision, req.GetNamespace())
+ registerReq := &RegisterRequest{ConfigsUpdated: map[core_model.ResourceReq]*mesh_proto.MetaData{}}
+ key := core_model.ResourceReq{
+ Mesh: mesh,
+ Name: name,
+ PodName: podName,
+ Namespace: namespace,
}
registerReq.ConfigsUpdated[key] = metadata
@@ -328,14 +331,17 @@
}
}
-func (m *MetadataServer) tryRegister(key core_model.ResourceKey, newMetadata *mesh_proto.MetaData) error {
+func (m *MetadataServer) tryRegister(key core_model.ResourceReq, newMetadata *mesh_proto.MetaData) error {
err := core_store.InTx(m.ctx, m.transactions, func(ctx context.Context) error {
// get Metadata Resource first,
// if Metadata is not found, create it,
// else update it.
metadata := core_mesh.NewMetaDataResource()
- err := m.resourceManager.Get(m.ctx, metadata, core_store.GetBy(key))
+ err := m.resourceManager.Get(m.ctx, metadata, core_store.GetBy(core_model.ResourceKey{
+ Mesh: key.Mesh,
+ Name: key.Name,
+ }))
if err != nil && !core_store.IsResourceNotFound(err) {
log.Error(err, "get Metadata Resource")
return err
@@ -344,14 +350,16 @@
if core_store.IsResourceNotFound(err) {
// create if not found
metadata.Spec = newMetadata
- err = m.resourceManager.Create(m.ctx, metadata, core_store.CreateBy(key), core_store.CreatedAt(time.Now()))
+ err = m.resourceManager.Create(m.ctx, metadata, core_store.CreateBy(core_model.ResourceKey{
+ Mesh: key.Mesh,
+ Name: key.Name,
+ }), core_store.CreatedAt(time.Now()))
if err != nil {
log.Error(err, "create Metadata Resource failed")
return err
}
log.Info("create Metadata Resource success", "key", key, "metadata", newMetadata)
- return nil
} else {
// if found, update it
metadata.Spec = newMetadata
@@ -363,8 +371,31 @@
}
log.Info("update Metadata Resource success", "key", key, "metadata", newMetadata)
- return nil
}
+
+ // 更新dataplane资源
+ // 根据podName Get到dataplane资源
+ dataplane := core_mesh.NewDataplaneResource()
+ err = m.resourceManager.Get(m.ctx, dataplane, core_store.GetBy(core_model.ResourceKey{
+ Mesh: core_model.DefaultMesh,
+ Name: rmkey.GenerateNamespacedName(key.PodName, key.Namespace),
+ }))
+ if err != nil {
+ return err
+ }
+ if dataplane.Spec.Extensions == nil {
+ dataplane.Spec.Extensions = make(map[string]string)
+ }
+ // 拿到dataplane, 添加extensions, 设置revision
+ dataplane.Spec.Extensions[mesh_proto.Revision] = metadata.Spec.Revision
+
+ // 更新dataplane
+ err = m.resourceManager.Update(m.ctx, dataplane)
+ if err != nil {
+ return err
+ }
+
+ return nil
})
if err != nil {
log.Error(err, "transactions failed")