Merge pull request #234 from dawnzzz/master
feat: Dubbo Pusher
diff --git a/api/mesh/v1alpha1/metadata.pb.go b/api/mesh/v1alpha1/metadata.pb.go
index 5f34659..6a074f7 100644
--- a/api/mesh/v1alpha1/metadata.pb.go
+++ b/api/mesh/v1alpha1/metadata.pb.go
@@ -1,26 +1,19 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
-// protoc v3.20.0
+// protoc v4.22.2
// source: api/mesh/v1alpha1/metadata.proto
package v1alpha1
import (
+ _ "github.com/apache/dubbo-kubernetes/api/mesh"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
-import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
-
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
-)
-
-import (
- _ "github.com/apache/dubbo-kubernetes/api/mesh"
-)
-
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
@@ -144,8 +137,9 @@
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- ApplicationName string `protobuf:"bytes,1,opt,name=applicationName,proto3" json:"applicationName,omitempty"`
- Revision string `protobuf:"bytes,2,opt,name=revision,proto3" json:"revision,omitempty"`
+ Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
+ ApplicationName string `protobuf:"bytes,2,opt,name=applicationName,proto3" json:"applicationName,omitempty"`
+ Revision string `protobuf:"bytes,3,opt,name=revision,proto3" json:"revision,omitempty"`
}
func (x *MetadataSyncRequest) Reset() {
@@ -180,6 +174,13 @@
return file_api_mesh_v1alpha1_metadata_proto_rawDescGZIP(), []int{2}
}
+func (x *MetadataSyncRequest) GetNonce() string {
+ if x != nil {
+ return x.Nonce
+ }
+ return ""
+}
+
func (x *MetadataSyncRequest) GetApplicationName() string {
if x != nil {
return x.ApplicationName
@@ -199,9 +200,9 @@
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
- Revision int64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"`
- Mappings []*MetaData `protobuf:"bytes,3,rep,name=mappings,proto3" json:"mappings,omitempty"`
+ Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
+ Revision int64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"`
+ MetaDatum []*MetaData `protobuf:"bytes,3,rep,name=metaDatum,proto3" json:"metaDatum,omitempty"`
}
func (x *MetadataSyncResponse) Reset() {
@@ -250,9 +251,9 @@
return 0
}
-func (x *MetadataSyncResponse) GetMappings() []*MetaData {
+func (x *MetadataSyncResponse) GetMetaDatum() []*MetaData {
if x != nil {
- return x.Mappings
+ return x.MetaDatum
}
return nil
}
@@ -436,78 +437,80 @@
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75,
0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22,
- 0x5b, 0x0a, 0x13, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63,
- 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65,
- 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a,
- 0x14, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72,
- 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72,
- 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69,
- 0x6e, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x64, 0x75, 0x62, 0x62,
- 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e,
- 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e,
- 0x67, 0x73, 0x22, 0xcd, 0x02, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12,
- 0x10, 0x0a, 0x03, 0x61, 0x70, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x61, 0x70,
- 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
- 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x47, 0x0a,
- 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32,
- 0x2b, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61,
- 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x53,
- 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x73, 0x65,
- 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x1a, 0x5d, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
- 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f,
- 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53,
- 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
- 0x65, 0x3a, 0x02, 0x38, 0x01, 0x3a, 0x6b, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x12, 0x0a, 0x10, 0x4d,
- 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa,
- 0x8c, 0x89, 0xa6, 0x01, 0x0a, 0x12, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0xaa,
- 0x8c, 0x89, 0xa6, 0x01, 0x06, 0x22, 0x04, 0x6d, 0x65, 0x73, 0x68, 0xaa, 0x8c, 0x89, 0xa6, 0x01,
- 0x04, 0x52, 0x02, 0x10, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0c, 0x3a, 0x0a, 0x0a, 0x08, 0x6d,
- 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0d, 0x3a, 0x0b, 0x12,
- 0x09, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x73, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x02,
- 0x68, 0x01, 0x22, 0x96, 0x02, 0x0a, 0x0b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e,
- 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x18, 0x0a, 0x07,
- 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76,
- 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03,
- 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x06,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x44, 0x0a, 0x06, 0x70, 0x61,
- 0x72, 0x61, 0x6d, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x64, 0x75, 0x62,
- 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
- 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x50, 0x61, 0x72,
- 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73,
- 0x1a, 0x39, 0x0a, 0x0b, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
+ 0x71, 0x0a, 0x13, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x0f,
+ 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
+ 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69,
+ 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69,
+ 0x6f, 0x6e, 0x22, 0x85, 0x01, 0x0a, 0x14, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53,
+ 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e,
+ 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63,
+ 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x3b, 0x0a,
+ 0x09, 0x6d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x1d, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
+ 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52,
+ 0x09, 0x6d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x75, 0x6d, 0x22, 0xcd, 0x02, 0x0a, 0x08, 0x4d,
+ 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x70, 0x70, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x61, 0x70, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76,
+ 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76,
+ 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x47, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
+ 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e,
+ 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65,
+ 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45,
+ 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x1a, 0x5d,
+ 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
- 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xeb, 0x01, 0x0a, 0x0f,
- 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12,
- 0x6f, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73,
- 0x74, 0x65, 0x72, 0x12, 0x2c, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68,
- 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61,
- 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x1a, 0x2d, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76,
- 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61,
- 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x12, 0x67, 0x0a, 0x0c, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63,
- 0x12, 0x28, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
- 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53,
- 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x75, 0x62,
- 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
- 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74,
- 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x64,
- 0x75, 0x62, 0x62, 0x6f, 0x2d, 0x6b, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f,
- 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61,
- 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x20, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
+ 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e,
+ 0x66, 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x3a, 0x6b, 0xaa,
+ 0x8c, 0x89, 0xa6, 0x01, 0x12, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52,
+ 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0a, 0x12, 0x08, 0x4d,
+ 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x06, 0x22, 0x04, 0x6d,
+ 0x65, 0x73, 0x68, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x04, 0x52, 0x02, 0x10, 0x01, 0xaa, 0x8c, 0x89,
+ 0xa6, 0x01, 0x0c, 0x3a, 0x0a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xaa,
+ 0x8c, 0x89, 0xa6, 0x01, 0x0d, 0x3a, 0x0b, 0x12, 0x09, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0x73, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x02, 0x68, 0x01, 0x22, 0x96, 0x02, 0x0a, 0x0b, 0x53,
+ 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14,
+ 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67,
+ 0x72, 0x6f, 0x75, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18,
+ 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a,
+ 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f,
+ 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12,
+ 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61,
+ 0x74, 0x68, 0x12, 0x44, 0x0a, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x07, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e,
+ 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
+ 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
+ 0x52, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x1a, 0x39, 0x0a, 0x0b, 0x50, 0x61, 0x72, 0x61,
+ 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
+ 0x02, 0x38, 0x01, 0x32, 0xeb, 0x01, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
+ 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6f, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x64,
+ 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x2c, 0x2e, 0x64, 0x75,
+ 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61,
+ 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74,
+ 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x64, 0x75, 0x62, 0x62,
+ 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e,
+ 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, 0x0a, 0x0c, 0x4d, 0x65, 0x74, 0x61,
+ 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x28, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f,
+ 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d,
+ 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e,
+ 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30,
+ 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2d, 0x6b, 0x75, 0x62,
+ 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68,
+ 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x33,
}
var (
@@ -535,7 +538,7 @@
}
var file_api_mesh_v1alpha1_metadata_proto_depIdxs = []int32{
4, // 0: dubbo.mesh.v1alpha1.MetaDataRegisterRequest.metadata:type_name -> dubbo.mesh.v1alpha1.MetaData
- 4, // 1: dubbo.mesh.v1alpha1.MetadataSyncResponse.mappings:type_name -> dubbo.mesh.v1alpha1.MetaData
+ 4, // 1: dubbo.mesh.v1alpha1.MetadataSyncResponse.metaDatum:type_name -> dubbo.mesh.v1alpha1.MetaData
6, // 2: dubbo.mesh.v1alpha1.MetaData.services:type_name -> dubbo.mesh.v1alpha1.MetaData.ServicesEntry
7, // 3: dubbo.mesh.v1alpha1.ServiceInfo.params:type_name -> dubbo.mesh.v1alpha1.ServiceInfo.ParamsEntry
5, // 4: dubbo.mesh.v1alpha1.MetaData.ServicesEntry.value:type_name -> dubbo.mesh.v1alpha1.ServiceInfo
diff --git a/api/mesh/v1alpha1/metadata.proto b/api/mesh/v1alpha1/metadata.proto
index d00d36f..ad6fd43 100644
--- a/api/mesh/v1alpha1/metadata.proto
+++ b/api/mesh/v1alpha1/metadata.proto
@@ -25,14 +25,15 @@
// 可以根据应用名和版本号进行获取
message MetadataSyncRequest {
- string applicationName = 1;
- string revision = 2;
+ string nonce = 1;
+ string applicationName = 2;
+ string revision = 3;
}
message MetadataSyncResponse {
string nonce = 1;
int64 revision = 2;
- repeated MetaData mappings = 3;
+ repeated MetaData metaDatum = 3;
}
message MetaData {
diff --git a/api/mesh/v1alpha1/metadata_grpc.pb.go b/api/mesh/v1alpha1/metadata_grpc.pb.go
index a1303f6..0a216e9 100644
--- a/api/mesh/v1alpha1/metadata_grpc.pb.go
+++ b/api/mesh/v1alpha1/metadata_grpc.pb.go
@@ -1,12 +1,13 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.2.0
+// - protoc v4.22.2
+// source: api/mesh/v1alpha1/metadata.proto
package v1alpha1
import (
context "context"
-)
-
-import (
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index 96d4117..f200b47 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -25,78 +25,136 @@
"github.com/google/uuid"
"github.com/pkg/errors"
+
+ "google.golang.org/grpc"
+
+ "google.golang.org/protobuf/proto"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
-var _ MappingSyncStream = &stream{}
+var _ DubboSyncStream = &stream{}
type stream struct {
- streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer
+ streamClient grpc.ServerStream
- // subscribedInterfaceNames records request's interfaceName in Mapping Request from data plane.
+ // subscribedInterfaceNames records request's interfaceName in MappingSync Request from data plane.
subscribedInterfaceNames map[string]struct{}
- lastNonce string
- mu sync.RWMutex
+ // subscribedApplicationNames records request's applicationName in MetaDataSync Request from data plane.
+ subscribedApplicationNames map[string]struct{}
+
+ mappingLastNonce string
+ metadataLastNonce string
+ mu sync.RWMutex
}
-func NewMappingSyncStream(streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer) MappingSyncStream {
+func NewDubboSyncStream(streamClient grpc.ServerStream) DubboSyncStream {
return &stream{
streamClient: streamClient,
- subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedApplicationNames: make(map[string]struct{}),
}
}
-type MappingSyncStream interface {
- Recv() (*mesh_proto.MappingSyncRequest, error)
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+type DubboSyncStream interface {
+ Recv() (proto.Message, error)
+ Send(resourceList core_model.ResourceList, revision int64) error
SubscribedInterfaceNames() []string
+ SubscribedApplicationNames() []string
}
-func (s *stream) Recv() (*mesh_proto.MappingSyncRequest, error) {
- request, err := s.streamClient.Recv()
- if err != nil {
- return nil, err
+func (s *stream) Recv() (proto.Message, error) {
+ switch s.streamClient.(type) {
+ case mesh_proto.ServiceNameMappingService_MappingSyncServer:
+ request := &mesh_proto.MappingSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.mappingLastNonce != "" && s.mappingLastNonce != request.GetNonce() {
+ return nil, errors.New("mapping sync request's nonce is different to last nonce")
+ }
+
+ // subscribe Mapping
+ s.mu.Lock()
+ interfaceName := request.GetInterfaceName()
+ s.subscribedInterfaceNames[interfaceName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
+ case mesh_proto.MetadataService_MetadataSyncServer:
+ request := &mesh_proto.MetadataSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.metadataLastNonce != "" && s.metadataLastNonce != request.GetNonce() {
+ return nil, errors.New("metadata sync request's nonce is different to last nonce")
+ }
+
+ // subscribe MetaData
+ s.mu.Lock()
+ appName := request.GetApplicationName()
+ s.subscribedApplicationNames[appName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
+ default:
+ return nil, errors.New("unknown type request")
}
-
- if s.lastNonce != "" && s.lastNonce != request.GetNonce() {
- return nil, errors.New("request's nonce is different to last nonce")
- }
-
- // subscribe Mapping
- s.mu.Lock()
- interfaceName := request.GetInterfaceName()
- s.subscribedInterfaceNames[interfaceName] = struct{}{}
- s.mu.Lock()
-
- return request, nil
}
-func (s *stream) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
+func (s *stream) Send(resourceList core_model.ResourceList, revision int64) error {
s.mu.RLock()
defer s.mu.RUnlock()
nonce := uuid.NewString()
- mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
- for _, item := range mappingList.Items {
- mappings = append(mappings, &mesh_proto.Mapping{
- Zone: item.Spec.Zone,
- InterfaceName: item.Spec.InterfaceName,
- ApplicationNames: item.Spec.ApplicationNames,
- })
- }
- s.lastNonce = nonce
- response := &mesh_proto.MappingSyncResponse{
- Nonce: nonce,
- Revision: revision,
- Mappings: mappings,
+ switch resourceList.(type) {
+ case *core_mesh.MappingResourceList:
+ mappingList := resourceList.(*core_mesh.MappingResourceList)
+ mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
+ for _, item := range mappingList.Items {
+ mappings = append(mappings, &mesh_proto.Mapping{
+ Zone: item.Spec.Zone,
+ InterfaceName: item.Spec.InterfaceName,
+ ApplicationNames: item.Spec.ApplicationNames,
+ })
+ }
+
+ s.mappingLastNonce = nonce
+ response := &mesh_proto.MappingSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ Mappings: mappings,
+ }
+ return s.streamClient.SendMsg(response)
+ case *core_mesh.MetaDataResourceList:
+ metadataList := resourceList.(*core_mesh.MetaDataResourceList)
+ metaDatum := make([]*mesh_proto.MetaData, 0, len(metadataList.Items))
+ for _, item := range metadataList.Items {
+ metaDatum = append(metaDatum, &mesh_proto.MetaData{
+ App: item.Spec.GetApp(),
+ Revision: item.Spec.Revision,
+ Services: item.Spec.GetServices(),
+ })
+ }
+
+ s.metadataLastNonce = nonce
+ response := &mesh_proto.MetadataSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ MetaDatum: metaDatum,
+ }
+ return s.streamClient.SendMsg(response)
+ default:
+ return errors.New("unknown type request")
}
- return s.streamClient.Send(response)
}
func (s *stream) SubscribedInterfaceNames() []string {
@@ -110,3 +168,14 @@
return result
}
+func (s *stream) SubscribedApplicationNames() []string {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ result := make([]string, 0, len(s.subscribedApplicationNames))
+ for appName := range s.subscribedApplicationNames {
+ result = append(result, appName)
+ }
+
+ return result
+}
diff --git a/pkg/dubbo/client/sync_client.go b/pkg/dubbo/client/sync_client.go
index 1641912..bad8d72 100644
--- a/pkg/dubbo/client/sync_client.go
+++ b/pkg/dubbo/client/sync_client.go
@@ -29,29 +29,30 @@
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
type Callbacks struct {
- OnRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMappingSyncRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMetadataSyncRequestReceived func(request *mesh_proto.MetadataSyncRequest) error
}
-// MappingSyncClient Handle MappingSyncRequest from client
-type MappingSyncClient interface {
+// DubboSyncClient Handle Dubbo Sync Request from client
+type DubboSyncClient interface {
ClientID() string
HandleReceive() error
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+ Send(resourceList core_model.ResourceList, revision int64) error
}
-type mappingSyncClient struct {
+type dubboSyncClient struct {
log logr.Logger
id string
- syncStream MappingSyncStream
+ syncStream DubboSyncStream
callbacks *Callbacks
}
-func NewMappingSyncClient(log logr.Logger, id string, syncStream MappingSyncStream, cb *Callbacks) MappingSyncClient {
- return &mappingSyncClient{
+func NewDubboSyncClient(log logr.Logger, id string, syncStream DubboSyncStream, cb *Callbacks) DubboSyncClient {
+ return &dubboSyncClient{
log: log,
id: id,
syncStream: syncStream,
@@ -59,11 +60,11 @@
}
}
-func (s *mappingSyncClient) ClientID() string {
+func (s *dubboSyncClient) ClientID() string {
return s.id
}
-func (s *mappingSyncClient) HandleReceive() error {
+func (s *dubboSyncClient) HandleReceive() error {
for {
received, err := s.syncStream.Recv()
if err != nil {
@@ -80,16 +81,22 @@
}
// callbacks
- err = s.callbacks.OnRequestReceived(received)
- if err != nil {
- s.log.Error(err, "error in OnRequestReceived")
- } else {
- s.log.Info("OnRequestReceived successed")
+ switch received.(type) {
+ case *mesh_proto.MappingSyncRequest:
+ err = s.callbacks.OnMappingSyncRequestReceived(received.(*mesh_proto.MappingSyncRequest))
+ if err != nil {
+ s.log.Error(err, "error in OnMappingSyncRequestReceived")
+ } else {
+ s.log.Info("OnMappingSyncRequestReceived successed")
+ }
+ case *mesh_proto.MetadataSyncRequest:
+ panic("unimplemented")
+ default:
+ return errors.New("unknown type request")
}
-
}
}
-func (s *mappingSyncClient) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
- return s.syncStream.Send(mappingList, revision)
+func (s *dubboSyncClient) Send(resourceList core_model.ResourceList, revision int64) error {
+ return s.syncStream.Send(resourceList, revision)
}
diff --git a/pkg/dubbo/components.go b/pkg/dubbo/components.go
index c61d61f..0fc70f9 100644
--- a/pkg/dubbo/components.go
+++ b/pkg/dubbo/components.go
@@ -63,6 +63,7 @@
// register MetadataService
metadata := dubbo_metadata.NewMetadataServe(
rt.AppContext(),
+ cfg,
dubboPusher,
rt.ResourceManager(),
rt.Transactions(),
diff --git a/pkg/dubbo/metadata/register_request.go b/pkg/dubbo/metadata/register_request.go
index 5dff680..df12e06 100644
--- a/pkg/dubbo/metadata/register_request.go
+++ b/pkg/dubbo/metadata/register_request.go
@@ -17,4 +17,37 @@
package metadata
-type RegisterRequest struct{}
+import "fmt"
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
+)
+
+type RegisterRequest struct {
+ ConfigsUpdated map[core_model.ResourceKey]*mesh_proto.MetaData
+}
+
+func (q *RegisterRequest) merge(req *RegisterRequest) *RegisterRequest {
+ if q == nil {
+ return req
+ }
+ for key, metaData := range req.ConfigsUpdated {
+ q.ConfigsUpdated[key] = metaData
+ }
+
+ return q
+}
+
+func configsUpdated(req *RegisterRequest) string {
+ configs := ""
+ for key := range req.ConfigsUpdated {
+ configs += key.Name + "." + key.Mesh
+ break
+ }
+ if len(req.ConfigsUpdated) > 1 {
+ more := fmt.Sprintf(" and %d more configs", len(req.ConfigsUpdated)-1)
+ configs += more
+ }
+ return configs
+}
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index 94d47d6..762a56d 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -19,13 +19,30 @@
import (
"context"
+ "io"
+ "strings"
+ "time"
+)
+
+import (
+ "github.com/google/uuid"
+
+ "github.com/pkg/errors"
+
+ "google.golang.org/grpc/codes"
+
+ "google.golang.org/grpc/status"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ "github.com/apache/dubbo-kubernetes/pkg/config/dubbo"
"github.com/apache/dubbo-kubernetes/pkg/core"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+ "github.com/apache/dubbo-kubernetes/pkg/dubbo/client"
"github.com/apache/dubbo-kubernetes/pkg/dubbo/pusher"
)
@@ -36,6 +53,7 @@
type MetadataServer struct {
mesh_proto.MetadataServiceServer
+ config dubbo.DubboConfig
queue chan *RegisterRequest
pusher pusher.Pusher
@@ -44,22 +62,28 @@
transactions core_store.Transactions
}
-func (s *MetadataServer) Start(stop <-chan struct{}) error {
+func (m *MetadataServer) Start(stop <-chan struct{}) error {
+ // we start debounce to prevent too many MetadataRegisterRequests, we aggregate metadata register information
+ go m.debounce(stop, m.register)
+
return nil
}
-func (s *MetadataServer) NeedLeaderElection() bool {
+func (m *MetadataServer) NeedLeaderElection() bool {
return false
}
func NewMetadataServe(
ctx context.Context,
+ config dubbo.DubboConfig,
pusher pusher.Pusher,
resourceManager manager.ResourceManager,
transactions core_store.Transactions,
) *MetadataServer {
return &MetadataServer{
+ config: config,
pusher: pusher,
+ queue: make(chan *RegisterRequest, queueSize),
ctx: ctx,
resourceManager: resourceManager,
transactions: transactions,
@@ -67,14 +91,285 @@
}
func (m *MetadataServer) MetadataRegister(ctx context.Context, req *mesh_proto.MetaDataRegisterRequest) (*mesh_proto.MetaDataRegisterResponse, error) {
+ mesh := core_model.DefaultMesh // todo: mesh
+ podName := req.GetPodName()
+ metadata := req.GetMetadata()
+ if metadata == nil {
+ return &mesh_proto.MetaDataRegisterResponse{
+ Success: false,
+ Message: "Metadata is nil",
+ }, nil
+ }
+
+ // MetaData name = podName.revision
+ name := podName + "." + metadata.Revision
+ registerReq := &RegisterRequest{ConfigsUpdated: map[core_model.ResourceKey]*mesh_proto.MetaData{}}
+ key := core_model.ResourceKey{
+ Mesh: mesh,
+ Name: name,
+ }
+ registerReq.ConfigsUpdated[key] = metadata
+
+ // push into queue to debounce, register Metadata Resource
+ m.queue <- registerReq
+
return &mesh_proto.MetaDataRegisterResponse{
- Success: false,
+ Success: true,
Message: "success",
}, nil
}
-func (m MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
- return nil
+func (m *MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
+ mesh := core_model.DefaultMesh // todo: mesh
+ errChan := make(chan error)
+
+ clientID := uuid.NewString()
+ metadataSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MetaSyncRequest from data plane
+ metadataSyncClient := client.NewDubboSyncClient(
+ log.WithName("client"),
+ clientID,
+ metadataSyncStream,
+ &client.Callbacks{
+ OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
+ // when received request, invoke callback
+ m.pusher.InvokeCallback(
+ core_mesh.MetaDataType,
+ clientID,
+ request,
+ func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
+ req := rawRequest.(*mesh_proto.MetadataSyncRequest)
+ metadataList := resourceList.(*core_mesh.MetaDataResourceList)
+
+ // only response the target MetaData Resource by application name or revision
+ respMetadataList := &core_mesh.MetaDataResourceList{}
+ for _, item := range metadataList.Items {
+ // MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
+ if item.Spec != nil && strings.HasPrefix(item.Spec.App, req.ApplicationName) {
+ if req.Revision != "" {
+ // revision is not empty, response the Metadata with application name and target revision
+ if req.Revision == item.Spec.Revision {
+ _ = respMetadataList.AddItem(item)
+ }
+ } else {
+ // revision is empty, response the Metadata with target application name
+ _ = respMetadataList.AddItem(item)
+ }
+ }
+ }
+
+ return respMetadataList
+ },
+ )
+ return nil
+ },
+ })
+ go func() {
+ // Handle requests from client
+ err := metadataSyncClient.HandleReceive()
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
+ }()
+
+ m.pusher.AddCallback(
+ core_mesh.MetaDataType,
+ metadataSyncClient.ClientID(),
+ func(items pusher.PushedItems) {
+ resourceList := items.ResourceList()
+ revision := items.Revision()
+ metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
+ if !ok {
+ return
+ }
+
+ err := metadataSyncClient.Send(metadataList, revision)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "send metadata sync response failed", "metadataList", metadataList, "revision", revision)
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
+ }
+ },
+ func(resourceList core_model.ResourceList) core_model.ResourceList {
+ if resourceList.GetItemType() != core_mesh.MetaDataType {
+ return nil
+ }
+
+ // only send Metadata which client subscribed
+ newResourceList := &core_mesh.MeshResourceList{}
+ for _, resource := range resourceList.GetItems() {
+ expected := false
+ metaData := resource.(*core_mesh.MetaDataResource)
+ for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
+ // MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
+ if strings.HasPrefix(metaData.Spec.GetApp(), applicationName) && mesh == resource.GetMeta().GetMesh() {
+ expected = true
+ break
+ }
+ }
+
+ if expected {
+ // find
+ _ = newResourceList.AddItem(resource)
+ }
+ }
+
+ return newResourceList
+ },
+ )
+
+ // in the end, remove callback of this client
+ defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())
+
+ for {
+ select {
+ case err := <-errChan:
+ if err == nil {
+ log.Info("MetadataSync finished gracefully")
+ return nil
+ }
+
+ log.Error(err, "MetadataSync finished with an error")
+ return status.Error(codes.Internal, err.Error())
+ }
+ }
}
-func (s *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {}
+func (m *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {
+ ch := m.queue
+ var timeChan <-chan time.Time
+ var startDebounce time.Time
+ var lastConfigUpdateTime time.Time
+
+ pushCounter := 0
+ debouncedEvents := 0
+
+ var req *RegisterRequest
+
+ free := true
+ freeCh := make(chan struct{}, 1)
+
+ push := func(req *RegisterRequest) {
+ pushFn(req)
+ freeCh <- struct{}{}
+ }
+
+ pushWorker := func() {
+ eventDelay := time.Since(startDebounce)
+ quietTime := time.Since(lastConfigUpdateTime)
+ if eventDelay >= m.config.Debounce.Max || quietTime >= m.config.Debounce.After {
+ if req != nil {
+ pushCounter++
+
+ if req.ConfigsUpdated != nil {
+ log.Info("debounce stable[%d] %d for config %s: %v since last change, %v since last push",
+ pushCounter, debouncedEvents, configsUpdated(req),
+ quietTime, eventDelay)
+ }
+ free = false
+ go push(req)
+ req = nil
+ debouncedEvents = 0
+ }
+ } else {
+ timeChan = time.After(m.config.Debounce.After - quietTime)
+ }
+ }
+
+ for {
+ select {
+ case <-freeCh:
+ free = true
+ pushWorker()
+ case r := <-ch:
+ if !m.config.Debounce.Enable {
+ go push(r)
+ req = nil
+ continue
+ }
+
+ lastConfigUpdateTime = time.Now()
+ if debouncedEvents == 0 {
+ timeChan = time.After(200 * time.Millisecond)
+ startDebounce = lastConfigUpdateTime
+ }
+ debouncedEvents++
+
+ req = req.merge(r)
+ case <-timeChan:
+ if free {
+ pushWorker()
+ }
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+func (m *MetadataServer) register(req *RegisterRequest) {
+ for key, metadata := range req.ConfigsUpdated {
+ for i := 0; i < 3; i++ {
+ if err := m.tryRegister(key, metadata); err != nil {
+ log.Error(err, "register failed", "key", key)
+ } else {
+ break
+ }
+ }
+ }
+}
+
+func (m *MetadataServer) tryRegister(key core_model.ResourceKey, newMetadata *mesh_proto.MetaData) error {
+ err := core_store.InTx(m.ctx, m.transactions, func(ctx context.Context) error {
+
+ // get Metadata Resource first,
+ // if Metadata is not found, create it,
+ // else update it.
+ metadata := core_mesh.NewMetaDataResource()
+ err := m.resourceManager.Get(m.ctx, metadata, core_store.GetBy(key))
+ if err != nil && !core_store.IsResourceNotFound(err) {
+ log.Error(err, "get Metadata Resource")
+ return err
+ }
+
+ if core_store.IsResourceNotFound(err) {
+ // create if not found
+ metadata.Spec = newMetadata
+ err = m.resourceManager.Create(m.ctx, metadata, core_store.CreateBy(key), core_store.CreatedAt(time.Now()))
+ if err != nil {
+ log.Error(err, "create Metadata Resource failed")
+ return err
+ }
+
+ log.Info("create Metadata Resource success", "key", key, "metadata", newMetadata)
+ return nil
+ } else {
+ // if found, update it
+ metadata.Spec = newMetadata
+
+ err = m.resourceManager.Update(m.ctx, metadata, core_store.ModifiedAt(time.Now()))
+ if err != nil {
+ log.Error(err, "update Metadata Resource failed")
+ return err
+ }
+
+ log.Info("update Metadata Resource success", "key", key, "metadata", newMetadata)
+ return nil
+ }
+ })
+ if err != nil {
+ log.Error(err, "transactions failed")
+ return err
+ }
+
+ return nil
+}
diff --git a/pkg/dubbo/pusher/interface.go b/pkg/dubbo/pusher/interface.go
index 1b1a6b2..01427b2 100644
--- a/pkg/dubbo/pusher/interface.go
+++ b/pkg/dubbo/pusher/interface.go
@@ -32,5 +32,5 @@
RemoveCallback(resourceType core_model.ResourceType, id string)
// InvokeCallback invoke a target callback
// for example, for a push request from client, invoke this function to push resource.
- InvokeCallback(resourceType core_model.ResourceType, id string)
+ InvokeCallback(resourceType core_model.ResourceType, id string, request interface{}, requestFilter ResourceRequestFilter)
}
diff --git a/pkg/dubbo/pusher/pusher.go b/pkg/dubbo/pusher/pusher.go
index e8427e9..9604dc2 100644
--- a/pkg/dubbo/pusher/pusher.go
+++ b/pkg/dubbo/pusher/pusher.go
@@ -19,7 +19,6 @@
import (
"context"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"reflect"
"time"
)
@@ -55,8 +54,10 @@
resourceChangedEventListeners map[core_model.ResourceType]events.Listener
eventsChannel chan *changedEvent
requestChannel chan struct {
- resourceType core_model.ResourceType
- id string
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
}
resourceChangedCallbacks *ResourceChangedCallbacks
@@ -73,13 +74,14 @@
eventBus: eventBus,
newFullResyncTicker: newFullResyncTicker,
resourceTypes: make(map[core_model.ResourceType]struct{}),
- resourceLastPushed: make(map[core_model.ResourceType]core_model.ResourceList),
resourceRevisions: make(map[core_model.ResourceType]revision),
resourceChangedEventListeners: make(map[core_model.ResourceType]events.Listener),
eventsChannel: make(chan *changedEvent, eventsChannelSize),
requestChannel: make(chan struct {
- resourceType core_model.ResourceType
- id string
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
}, requestChannelSize),
resourceChangedCallbacks: NewResourceChangedCallbacks(),
@@ -154,12 +156,8 @@
return nil
case ce := <-p.eventsChannel:
log.Info("event received", "ResourceType", ce.resourceType)
- resourceList, err := registry.Global().NewList(ce.resourceType)
- if err != nil {
- log.Info("can not get resourceList")
- continue
- }
- err = p.resourceManager.List(ctx, resourceList)
+ var resourceList core_model.ResourceList
+ err := p.resourceManager.List(ctx, resourceList)
if err != nil {
log.Error(err, "list resource failed", "ResourceType", ce.resourceType)
continue
@@ -196,8 +194,13 @@
continue
}
+ resourceList := lastedPushed
+ if req.requestFilter != nil {
+ resourceList = req.requestFilter(req.request, lastedPushed)
+ }
+
cb.Invoke(PushedItems{
- resourceList: lastedPushed,
+ resourceList: resourceList,
revision: revision,
})
case <-fullResyncTicker.C:
@@ -232,9 +235,16 @@
p.resourceChangedCallbacks.RemoveCallBack(resourceType, id)
}
-func (p *pusher) InvokeCallback(resourceType core_model.ResourceType, id string) {
+func (p *pusher) InvokeCallback(resourceType core_model.ResourceType, id string, request interface{}, requestFilter ResourceRequestFilter) {
p.requestChannel <- struct {
- resourceType core_model.ResourceType
- id string
- }{resourceType: resourceType, id: id}
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
+ }{
+ request: request,
+ requestFilter: requestFilter,
+ resourceType: resourceType,
+ id: id,
+ }
}
diff --git a/pkg/dubbo/pusher/resource_changed_callbacks.go b/pkg/dubbo/pusher/resource_changed_callbacks.go
index a185ebd..caf0675 100644
--- a/pkg/dubbo/pusher/resource_changed_callbacks.go
+++ b/pkg/dubbo/pusher/resource_changed_callbacks.go
@@ -28,6 +28,7 @@
type (
ResourceChangedCallbackFn func(items PushedItems)
ResourceChangedEventFilter func(resourceList core_model.ResourceList) core_model.ResourceList
+ ResourceRequestFilter func(request interface{}, resourceList core_model.ResourceList) core_model.ResourceList
)
type ResourceChangedCallback struct {
diff --git a/pkg/dubbo/servicemapping/server.go b/pkg/dubbo/servicemapping/server.go
index addd354..bc802f3 100644
--- a/pkg/dubbo/servicemapping/server.go
+++ b/pkg/dubbo/servicemapping/server.go
@@ -126,16 +126,34 @@
errChan := make(chan error)
clientID := uuid.NewString()
- mappingSyncStream := client.NewMappingSyncStream(stream)
- // MappingSyncClient is to handle MappingSyncRequest from data plane
- mappingSyncClient := client.NewMappingSyncClient(
+ mappingSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MappingSyncRequest from data plane
+ mappingSyncClient := client.NewDubboSyncClient(
log.WithName("client"),
clientID,
mappingSyncStream,
&client.Callbacks{
- OnRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
+ OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
// when received request, invoke callback
- s.pusher.InvokeCallback(core_mesh.MappingType, clientID)
+ s.pusher.InvokeCallback(
+ core_mesh.MappingType,
+ clientID,
+ request,
+ func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
+ req := rawRequest.(*mesh_proto.MappingSyncRequest)
+ mappingList := resourceList.(*core_mesh.MappingResourceList)
+
+ // only response the target Mapping Resource by interface name
+ respMappingList := &core_mesh.MappingResourceList{}
+ for _, item := range mappingList.Items {
+ if item.Spec != nil && req.InterfaceName == item.Spec.InterfaceName {
+ _ = respMappingList.AddItem(item)
+ }
+ }
+
+ return respMappingList
+ },
+ )
return nil
},
})
@@ -143,13 +161,13 @@
// Handle requests from client
err := mappingSyncClient.HandleReceive()
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
- log.Error(err, "MappingSyncClient finished with an error")
- errChan <- errors.Wrap(err, "MappingSyncClient finished with an error")
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
}()
s.pusher.AddCallback(
@@ -166,13 +184,13 @@
err := mappingSyncClient.Send(mappingList, revision)
if err != nil {
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
- errChan <- errors.Wrap(err, "MappingSyncClient send with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
}
},
func(resourceList core_model.ResourceList) core_model.ResourceList {