Merge branch 'apache:master' into master
diff --git a/api/mesh/v1alpha1/metadata.pb.go b/api/mesh/v1alpha1/metadata.pb.go
index 5f34659..6a074f7 100644
--- a/api/mesh/v1alpha1/metadata.pb.go
+++ b/api/mesh/v1alpha1/metadata.pb.go
@@ -1,26 +1,19 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
-// protoc v3.20.0
+// protoc v4.22.2
// source: api/mesh/v1alpha1/metadata.proto
package v1alpha1
import (
+ _ "github.com/apache/dubbo-kubernetes/api/mesh"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
-import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
-
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
-)
-
-import (
- _ "github.com/apache/dubbo-kubernetes/api/mesh"
-)
-
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
@@ -144,8 +137,9 @@
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- ApplicationName string `protobuf:"bytes,1,opt,name=applicationName,proto3" json:"applicationName,omitempty"`
- Revision string `protobuf:"bytes,2,opt,name=revision,proto3" json:"revision,omitempty"`
+ Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
+ ApplicationName string `protobuf:"bytes,2,opt,name=applicationName,proto3" json:"applicationName,omitempty"`
+ Revision string `protobuf:"bytes,3,opt,name=revision,proto3" json:"revision,omitempty"`
}
func (x *MetadataSyncRequest) Reset() {
@@ -180,6 +174,13 @@
return file_api_mesh_v1alpha1_metadata_proto_rawDescGZIP(), []int{2}
}
+func (x *MetadataSyncRequest) GetNonce() string {
+ if x != nil {
+ return x.Nonce
+ }
+ return ""
+}
+
func (x *MetadataSyncRequest) GetApplicationName() string {
if x != nil {
return x.ApplicationName
@@ -199,9 +200,9 @@
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
- Revision int64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"`
- Mappings []*MetaData `protobuf:"bytes,3,rep,name=mappings,proto3" json:"mappings,omitempty"`
+ Nonce string `protobuf:"bytes,1,opt,name=nonce,proto3" json:"nonce,omitempty"`
+ Revision int64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"`
+ MetaDatum []*MetaData `protobuf:"bytes,3,rep,name=metaDatum,proto3" json:"metaDatum,omitempty"`
}
func (x *MetadataSyncResponse) Reset() {
@@ -250,9 +251,9 @@
return 0
}
-func (x *MetadataSyncResponse) GetMappings() []*MetaData {
+func (x *MetadataSyncResponse) GetMetaDatum() []*MetaData {
if x != nil {
- return x.Mappings
+ return x.MetaDatum
}
return nil
}
@@ -436,78 +437,80 @@
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75,
0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22,
- 0x5b, 0x0a, 0x13, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63,
- 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65,
- 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a,
- 0x14, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72,
- 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72,
- 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69,
- 0x6e, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x64, 0x75, 0x62, 0x62,
- 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e,
- 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e,
- 0x67, 0x73, 0x22, 0xcd, 0x02, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12,
- 0x10, 0x0a, 0x03, 0x61, 0x70, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x61, 0x70,
- 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
- 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x47, 0x0a,
- 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32,
- 0x2b, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61,
- 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x53,
- 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x73, 0x65,
- 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x1a, 0x5d, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
- 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f,
- 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53,
- 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
- 0x65, 0x3a, 0x02, 0x38, 0x01, 0x3a, 0x6b, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x12, 0x0a, 0x10, 0x4d,
- 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa,
- 0x8c, 0x89, 0xa6, 0x01, 0x0a, 0x12, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0xaa,
- 0x8c, 0x89, 0xa6, 0x01, 0x06, 0x22, 0x04, 0x6d, 0x65, 0x73, 0x68, 0xaa, 0x8c, 0x89, 0xa6, 0x01,
- 0x04, 0x52, 0x02, 0x10, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0c, 0x3a, 0x0a, 0x0a, 0x08, 0x6d,
- 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0d, 0x3a, 0x0b, 0x12,
- 0x09, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x73, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x02,
- 0x68, 0x01, 0x22, 0x96, 0x02, 0x0a, 0x0b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e,
- 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x18, 0x0a, 0x07,
- 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76,
- 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03,
- 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x06,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x44, 0x0a, 0x06, 0x70, 0x61,
- 0x72, 0x61, 0x6d, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x64, 0x75, 0x62,
- 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
- 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x50, 0x61, 0x72,
- 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73,
- 0x1a, 0x39, 0x0a, 0x0b, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
+ 0x71, 0x0a, 0x13, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x0f,
+ 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
+ 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69,
+ 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69,
+ 0x6f, 0x6e, 0x22, 0x85, 0x01, 0x0a, 0x14, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53,
+ 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e,
+ 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63,
+ 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x3b, 0x0a,
+ 0x09, 0x6d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x1d, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
+ 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52,
+ 0x09, 0x6d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x75, 0x6d, 0x22, 0xcd, 0x02, 0x0a, 0x08, 0x4d,
+ 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x70, 0x70, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x61, 0x70, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76,
+ 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76,
+ 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x47, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
+ 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e,
+ 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65,
+ 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45,
+ 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x1a, 0x5d,
+ 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
- 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xeb, 0x01, 0x0a, 0x0f,
- 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12,
- 0x6f, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73,
- 0x74, 0x65, 0x72, 0x12, 0x2c, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68,
- 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61,
- 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x1a, 0x2d, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76,
- 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61,
- 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x12, 0x67, 0x0a, 0x0c, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63,
- 0x12, 0x28, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
- 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53,
- 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x75, 0x62,
- 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
- 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74,
- 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x64,
- 0x75, 0x62, 0x62, 0x6f, 0x2d, 0x6b, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f,
- 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61,
- 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x20, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31,
+ 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e,
+ 0x66, 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x3a, 0x6b, 0xaa,
+ 0x8c, 0x89, 0xa6, 0x01, 0x12, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52,
+ 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0a, 0x12, 0x08, 0x4d,
+ 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x06, 0x22, 0x04, 0x6d,
+ 0x65, 0x73, 0x68, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x04, 0x52, 0x02, 0x10, 0x01, 0xaa, 0x8c, 0x89,
+ 0xa6, 0x01, 0x0c, 0x3a, 0x0a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xaa,
+ 0x8c, 0x89, 0xa6, 0x01, 0x0d, 0x3a, 0x0b, 0x12, 0x09, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0x73, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x02, 0x68, 0x01, 0x22, 0x96, 0x02, 0x0a, 0x0b, 0x53,
+ 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14,
+ 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67,
+ 0x72, 0x6f, 0x75, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18,
+ 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a,
+ 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f,
+ 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12,
+ 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61,
+ 0x74, 0x68, 0x12, 0x44, 0x0a, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x07, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e,
+ 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
+ 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
+ 0x52, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x1a, 0x39, 0x0a, 0x0b, 0x50, 0x61, 0x72, 0x61,
+ 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
+ 0x02, 0x38, 0x01, 0x32, 0xeb, 0x01, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
+ 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6f, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x61, 0x64,
+ 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x2c, 0x2e, 0x64, 0x75,
+ 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61,
+ 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74,
+ 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x64, 0x75, 0x62, 0x62,
+ 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e,
+ 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, 0x0a, 0x0c, 0x4d, 0x65, 0x74, 0x61,
+ 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x28, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f,
+ 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d,
+ 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e,
+ 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30,
+ 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2d, 0x6b, 0x75, 0x62,
+ 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68,
+ 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x33,
}
var (
@@ -535,7 +538,7 @@
}
var file_api_mesh_v1alpha1_metadata_proto_depIdxs = []int32{
4, // 0: dubbo.mesh.v1alpha1.MetaDataRegisterRequest.metadata:type_name -> dubbo.mesh.v1alpha1.MetaData
- 4, // 1: dubbo.mesh.v1alpha1.MetadataSyncResponse.mappings:type_name -> dubbo.mesh.v1alpha1.MetaData
+ 4, // 1: dubbo.mesh.v1alpha1.MetadataSyncResponse.metaDatum:type_name -> dubbo.mesh.v1alpha1.MetaData
6, // 2: dubbo.mesh.v1alpha1.MetaData.services:type_name -> dubbo.mesh.v1alpha1.MetaData.ServicesEntry
7, // 3: dubbo.mesh.v1alpha1.ServiceInfo.params:type_name -> dubbo.mesh.v1alpha1.ServiceInfo.ParamsEntry
5, // 4: dubbo.mesh.v1alpha1.MetaData.ServicesEntry.value:type_name -> dubbo.mesh.v1alpha1.ServiceInfo
diff --git a/api/mesh/v1alpha1/metadata.proto b/api/mesh/v1alpha1/metadata.proto
index d00d36f..ad6fd43 100644
--- a/api/mesh/v1alpha1/metadata.proto
+++ b/api/mesh/v1alpha1/metadata.proto
@@ -25,14 +25,15 @@
// 可以根据应用名和版本号进行获取
message MetadataSyncRequest {
- string applicationName = 1;
- string revision = 2;
+ string nonce = 1;
+ string applicationName = 2;
+ string revision = 3;
}
message MetadataSyncResponse {
string nonce = 1;
int64 revision = 2;
- repeated MetaData mappings = 3;
+ repeated MetaData metaDatum = 3;
}
message MetaData {
diff --git a/api/mesh/v1alpha1/metadata_grpc.pb.go b/api/mesh/v1alpha1/metadata_grpc.pb.go
index a1303f6..0a216e9 100644
--- a/api/mesh/v1alpha1/metadata_grpc.pb.go
+++ b/api/mesh/v1alpha1/metadata_grpc.pb.go
@@ -1,12 +1,13 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.2.0
+// - protoc v4.22.2
+// source: api/mesh/v1alpha1/metadata.proto
package v1alpha1
import (
context "context"
-)
-
-import (
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
diff --git a/app/dubbo-cp/cmd/run.go b/app/dubbo-cp/cmd/run.go
index f168898..d4eb467 100644
--- a/app/dubbo-cp/cmd/run.go
+++ b/app/dubbo-cp/cmd/run.go
@@ -19,6 +19,8 @@
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/core"
+ "github.com/apache/dubbo-kubernetes/pkg/test"
"time"
)
@@ -31,7 +33,6 @@
"github.com/apache/dubbo-kubernetes/pkg/bufman"
"github.com/apache/dubbo-kubernetes/pkg/config"
dubbo_cp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp"
- "github.com/apache/dubbo-kubernetes/pkg/config/core"
"github.com/apache/dubbo-kubernetes/pkg/core/bootstrap"
dubbo_cmd "github.com/apache/dubbo-kubernetes/pkg/core/cmd"
dds_global "github.com/apache/dubbo-kubernetes/pkg/dds/global"
@@ -42,7 +43,6 @@
"github.com/apache/dubbo-kubernetes/pkg/dubbo"
"github.com/apache/dubbo-kubernetes/pkg/hds"
"github.com/apache/dubbo-kubernetes/pkg/intercp"
- "github.com/apache/dubbo-kubernetes/pkg/test"
"github.com/apache/dubbo-kubernetes/pkg/util/os"
dubbo_version "github.com/apache/dubbo-kubernetes/pkg/version"
"github.com/apache/dubbo-kubernetes/pkg/xds"
@@ -100,13 +100,6 @@
"minimim-open-files", minOpenFileLimit)
}
- if rt.GetMode() == core.Test {
- if err := test.Setup(rt); err != nil {
- runLog.Error(err, "unable to set up test")
- return err
- }
- }
-
if err := admin.Setup(rt); err != nil {
runLog.Error(err, "unable to set up admin")
return err
@@ -151,6 +144,13 @@
return err
}
+ if rt.GetMode() == core.Test {
+ if err := test.Setup(rt); err != nil {
+ runLog.Error(err, "unable to set up test")
+ return err
+ }
+ }
+
runLog.Info("starting Control Plane", "version", dubbo_version.Build.Version)
if err := rt.Start(gracefulCtx.Done()); err != nil {
runLog.Error(err, "problem running Control Plane")
diff --git a/conf/dubbo-cp.yaml b/conf/dubbo-cp.yaml
index d61487c..7345448 100644
--- a/conf/dubbo-cp.yaml
+++ b/conf/dubbo-cp.yaml
@@ -15,7 +15,14 @@
# 可选 k8s half universal
deploy_mode: k8s
-mode: test
+# mode: test
+mode: zone
+multizone:
+ zone:
+ globalAddress: grpc://127.0.0.1:5685
+ global:
+ dds:
+ grpcPort: 5685
runtime:
kubernetes:
admissionServer:
diff --git a/pkg/admin/component.go b/pkg/admin/component.go
index abc9757..aefad4b 100644
--- a/pkg/admin/component.go
+++ b/pkg/admin/component.go
@@ -26,7 +26,8 @@
var adminServerLog = core.Log.WithName("admin")
func Setup(rt core_runtime.Runtime) error {
- adminServer := server.NewAdminServer(*rt.Config().Admin).InitHTTPRouter()
+ adminServer := server.NewAdminServer(*rt.Config().Admin).
+ InitHTTPRouter()
if err := rt.Add(adminServer); err != nil {
adminServerLog.Error(err, "fail to start the admin server")
return err
diff --git a/pkg/config/app/dubbo-cp/config.go b/pkg/config/app/dubbo-cp/config.go
index 840a521..f42d02f 100644
--- a/pkg/config/app/dubbo-cp/config.go
+++ b/pkg/config/app/dubbo-cp/config.go
@@ -123,6 +123,14 @@
DelayFullResync bool `json:"delayFullResync" envconfig:"DUBBO_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC"`
}
+func DefaultEventBasedWatchdog() DDSEventBasedWatchdog {
+ return DDSEventBasedWatchdog{
+ FlushInterval: config_types.Duration{Duration: 5 * time.Second},
+ FullResyncInterval: config_types.Duration{Duration: 1 * time.Minute},
+ DelayFullResync: false,
+ }
+}
+
func (c Config) IsFederatedZoneCP() bool {
return c.Mode == core.Zone && c.Multizone.Zone.GlobalAddress != "" && c.Multizone.Zone.Name != ""
}
@@ -151,22 +159,23 @@
var DefaultConfig = func() Config {
return Config{
- BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
- DeployMode: core.UniversalMode,
- Mode: core.Zone,
- XdsServer: xds.DefaultXdsServerConfig(),
- Store: store.DefaultStoreConfig(),
- Runtime: runtime.DefaultRuntimeConfig(),
- Bufman: bufman.DefaultBufmanConfig(),
- General: DefaultGeneralConfig(),
- Defaults: DefaultDefaultsConfig(),
- Multizone: multizone.DefaultMultizoneConfig(),
- Diagnostics: diagnostics.DefaultDiagnosticsConfig(),
- DpServer: dp_server.DefaultDpServerConfig(),
- Admin: admin.DefaultAdminConfig(),
- InterCp: intercp.DefaultInterCpConfig(),
- DubboConfig: dubbo.DefaultServiceNameMappingConfig(),
- EventBus: eventbus.Default(),
+ BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
+ DeployMode: core.UniversalMode,
+ Mode: core.Zone,
+ XdsServer: xds.DefaultXdsServerConfig(),
+ Store: store.DefaultStoreConfig(),
+ Runtime: runtime.DefaultRuntimeConfig(),
+ Bufman: bufman.DefaultBufmanConfig(),
+ General: DefaultGeneralConfig(),
+ Defaults: DefaultDefaultsConfig(),
+ Multizone: multizone.DefaultMultizoneConfig(),
+ Diagnostics: diagnostics.DefaultDiagnosticsConfig(),
+ DpServer: dp_server.DefaultDpServerConfig(),
+ Admin: admin.DefaultAdminConfig(),
+ InterCp: intercp.DefaultInterCpConfig(),
+ DubboConfig: dubbo.DefaultServiceNameMappingConfig(),
+ EventBus: eventbus.Default(),
+ DDSEventBasedWatchdog: DefaultEventBasedWatchdog(),
}
}
diff --git a/pkg/config/multizone/multicluster.go b/pkg/config/multizone/multicluster.go
index 1ed14eb..57b243a 100644
--- a/pkg/config/multizone/multicluster.go
+++ b/pkg/config/multizone/multicluster.go
@@ -53,8 +53,16 @@
func DefaultGlobalConfig() *GlobalConfig {
return &GlobalConfig{
DDS: &DdsServerConfig{
- GrpcPort: 5685,
- RefreshInterval: config_types.Duration{Duration: 1 * time.Second},
+ GrpcPort: 5685,
+ RefreshInterval: config_types.Duration{Duration: 1 * time.Second},
+ ZoneInsightFlushInterval: config_types.Duration{Duration: 10 * time.Second},
+ TlsEnabled: false,
+ MaxMsgSize: 10 * 1024 * 1024,
+ MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second},
+ TlsMinVersion: "TLSv1_2",
+ TlsCipherSuites: []string{},
+ NackBackoff: config_types.Duration{Duration: 5 * time.Second},
+ DisableSOTW: false,
},
}
}
@@ -89,6 +97,12 @@
GlobalAddress: "",
Name: "default",
DisableOriginLabelValidation: false,
+ DDS: &DdsClientConfig{
+ RefreshInterval: config_types.Duration{Duration: 1 * time.Second},
+ MaxMsgSize: 10 * 1024 * 1024,
+ MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second},
+ NackBackoff: config_types.Duration{Duration: 5 * time.Second},
+ },
}
}
diff --git a/pkg/core/managers/apis/dataplane/dataplane_manager.go b/pkg/core/managers/apis/dataplane/dataplane_manager.go
index 2f4a56b..f3feb71 100644
--- a/pkg/core/managers/apis/dataplane/dataplane_manager.go
+++ b/pkg/core/managers/apis/dataplane/dataplane_manager.go
@@ -31,7 +31,6 @@
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
config_core "github.com/apache/dubbo-kubernetes/pkg/config/core"
"github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/logger"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_manager "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
@@ -69,11 +68,7 @@
if err != nil {
return err
}
- options := core_store.NewGetOptions(opts...)
- if options.Labels[mesh_proto.Application] == "" || options.Labels[mesh_proto.Revision] == "" {
- logger.Sugar().Error("需要携带application和revision才能查询")
- return nil
- }
+
if err := m.store.Get(ctx, dataplane, opts...); err != nil {
return err
}
diff --git a/pkg/dds/global/components.go b/pkg/dds/global/components.go
index 60f7d87..d568e4b 100644
--- a/pkg/dds/global/components.go
+++ b/pkg/dds/global/components.go
@@ -142,7 +142,7 @@
return err
}
}
- return rt.Add(component.NewResilientComponent(ddsDeltaGlobalLog.WithName("dds-mux-client"), mux.NewServer(
+ return rt.Add(mux.NewServer(
rt.DDSContext().GlobalServerFilters,
rt.DDSContext().ServerStreamInterceptors,
rt.DDSContext().ServerUnaryInterceptor,
@@ -166,7 +166,7 @@
rt.Extensions(),
rt.EventBus(),
),
- )))
+ ))
}
func createZoneIfAbsent(ctx context.Context, log logr.Logger, name string, resManager core_manager.ResourceManager) error {
diff --git a/pkg/dds/mux/client.go b/pkg/dds/mux/client.go
index 5e12d4c..4723ce5 100644
--- a/pkg/dds/mux/client.go
+++ b/pkg/dds/mux/client.go
@@ -143,7 +143,7 @@
func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) {
kdsClient := mesh_proto.NewDDSSyncServiceClient(conn)
log = log.WithValues("rpc", "global-to-zone")
- log.Info("initializing Kuma Discovery Service (KDS) stream for global to zone sync of resources with delta xDS")
+ log.Info("initializing Dubbo Discovery Service (DDS) stream for global to zone sync of resources with delta xDS")
stream, err := kdsClient.GlobalToZoneSync(ctx)
if err != nil {
errorCh <- err
@@ -157,7 +157,7 @@
func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) {
kdsClient := mesh_proto.NewDDSSyncServiceClient(conn)
log = log.WithValues("rpc", "zone-to-global")
- log.Info("initializing Kuma Discovery Service (KDS) stream for zone to global sync of resources with delta xDS")
+ log.Info("initializing Dubbo Discovery Service (DDS) stream for zone to global sync of resources with delta xDS")
stream, err := kdsClient.ZoneToGlobalSync(ctx)
if err != nil {
errorCh <- err
diff --git a/pkg/dubbo/client/stream.go b/pkg/dubbo/client/stream.go
index 96d4117..f200b47 100644
--- a/pkg/dubbo/client/stream.go
+++ b/pkg/dubbo/client/stream.go
@@ -25,78 +25,136 @@
"github.com/google/uuid"
"github.com/pkg/errors"
+
+ "google.golang.org/grpc"
+
+ "google.golang.org/protobuf/proto"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
-var _ MappingSyncStream = &stream{}
+var _ DubboSyncStream = &stream{}
type stream struct {
- streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer
+ streamClient grpc.ServerStream
- // subscribedInterfaceNames records request's interfaceName in Mapping Request from data plane.
+ // subscribedInterfaceNames records request's interfaceName in MappingSync Request from data plane.
subscribedInterfaceNames map[string]struct{}
- lastNonce string
- mu sync.RWMutex
+ // subscribedApplicationNames records request's applicationName in MetaDataSync Request from data plane.
+ subscribedApplicationNames map[string]struct{}
+
+ mappingLastNonce string
+ metadataLastNonce string
+ mu sync.RWMutex
}
-func NewMappingSyncStream(streamClient mesh_proto.ServiceNameMappingService_MappingSyncServer) MappingSyncStream {
+func NewDubboSyncStream(streamClient grpc.ServerStream) DubboSyncStream {
return &stream{
streamClient: streamClient,
- subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedInterfaceNames: make(map[string]struct{}),
+ subscribedApplicationNames: make(map[string]struct{}),
}
}
-type MappingSyncStream interface {
- Recv() (*mesh_proto.MappingSyncRequest, error)
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+type DubboSyncStream interface {
+ Recv() (proto.Message, error)
+ Send(resourceList core_model.ResourceList, revision int64) error
SubscribedInterfaceNames() []string
+ SubscribedApplicationNames() []string
}
-func (s *stream) Recv() (*mesh_proto.MappingSyncRequest, error) {
- request, err := s.streamClient.Recv()
- if err != nil {
- return nil, err
+func (s *stream) Recv() (proto.Message, error) {
+ switch s.streamClient.(type) {
+ case mesh_proto.ServiceNameMappingService_MappingSyncServer:
+ request := &mesh_proto.MappingSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.mappingLastNonce != "" && s.mappingLastNonce != request.GetNonce() {
+ return nil, errors.New("mapping sync request's nonce is different to last nonce")
+ }
+
+ // subscribe Mapping
+ s.mu.Lock()
+ interfaceName := request.GetInterfaceName()
+ s.subscribedInterfaceNames[interfaceName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
+ case mesh_proto.MetadataService_MetadataSyncServer:
+ request := &mesh_proto.MetadataSyncRequest{}
+ err := s.streamClient.RecvMsg(request)
+ if err != nil {
+ return nil, err
+ }
+ if s.metadataLastNonce != "" && s.metadataLastNonce != request.GetNonce() {
+ return nil, errors.New("metadata sync request's nonce is different to last nonce")
+ }
+
+ // subscribe MetaData
+ s.mu.Lock()
+ appName := request.GetApplicationName()
+ s.subscribedApplicationNames[appName] = struct{}{}
+ s.mu.Lock()
+
+ return request, nil
+ default:
+ return nil, errors.New("unknown type request")
}
-
- if s.lastNonce != "" && s.lastNonce != request.GetNonce() {
- return nil, errors.New("request's nonce is different to last nonce")
- }
-
- // subscribe Mapping
- s.mu.Lock()
- interfaceName := request.GetInterfaceName()
- s.subscribedInterfaceNames[interfaceName] = struct{}{}
- s.mu.Lock()
-
- return request, nil
}
-func (s *stream) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
+func (s *stream) Send(resourceList core_model.ResourceList, revision int64) error {
s.mu.RLock()
defer s.mu.RUnlock()
nonce := uuid.NewString()
- mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
- for _, item := range mappingList.Items {
- mappings = append(mappings, &mesh_proto.Mapping{
- Zone: item.Spec.Zone,
- InterfaceName: item.Spec.InterfaceName,
- ApplicationNames: item.Spec.ApplicationNames,
- })
- }
- s.lastNonce = nonce
- response := &mesh_proto.MappingSyncResponse{
- Nonce: nonce,
- Revision: revision,
- Mappings: mappings,
+ switch resourceList.(type) {
+ case *core_mesh.MappingResourceList:
+ mappingList := resourceList.(*core_mesh.MappingResourceList)
+ mappings := make([]*mesh_proto.Mapping, 0, len(mappingList.Items))
+ for _, item := range mappingList.Items {
+ mappings = append(mappings, &mesh_proto.Mapping{
+ Zone: item.Spec.Zone,
+ InterfaceName: item.Spec.InterfaceName,
+ ApplicationNames: item.Spec.ApplicationNames,
+ })
+ }
+
+ s.mappingLastNonce = nonce
+ response := &mesh_proto.MappingSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ Mappings: mappings,
+ }
+ return s.streamClient.SendMsg(response)
+ case *core_mesh.MetaDataResourceList:
+ metadataList := resourceList.(*core_mesh.MetaDataResourceList)
+ metaDatum := make([]*mesh_proto.MetaData, 0, len(metadataList.Items))
+ for _, item := range metadataList.Items {
+ metaDatum = append(metaDatum, &mesh_proto.MetaData{
+ App: item.Spec.GetApp(),
+ Revision: item.Spec.Revision,
+ Services: item.Spec.GetServices(),
+ })
+ }
+
+ s.metadataLastNonce = nonce
+ response := &mesh_proto.MetadataSyncResponse{
+ Nonce: nonce,
+ Revision: revision,
+ MetaDatum: metaDatum,
+ }
+ return s.streamClient.SendMsg(response)
+ default:
+ return errors.New("unknown type request")
}
- return s.streamClient.Send(response)
}
func (s *stream) SubscribedInterfaceNames() []string {
@@ -110,3 +168,14 @@
return result
}
+func (s *stream) SubscribedApplicationNames() []string {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ result := make([]string, 0, len(s.subscribedApplicationNames))
+ for appName := range s.subscribedApplicationNames {
+ result = append(result, appName)
+ }
+
+ return result
+}
diff --git a/pkg/dubbo/client/sync_client.go b/pkg/dubbo/client/sync_client.go
index 1641912..bad8d72 100644
--- a/pkg/dubbo/client/sync_client.go
+++ b/pkg/dubbo/client/sync_client.go
@@ -29,29 +29,30 @@
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
type Callbacks struct {
- OnRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMappingSyncRequestReceived func(request *mesh_proto.MappingSyncRequest) error
+ OnMetadataSyncRequestReceived func(request *mesh_proto.MetadataSyncRequest) error
}
-// MappingSyncClient Handle MappingSyncRequest from client
-type MappingSyncClient interface {
+// DubboSyncClient Handle Dubbo Sync Request from client
+type DubboSyncClient interface {
ClientID() string
HandleReceive() error
- Send(mappingList *core_mesh.MappingResourceList, revision int64) error
+ Send(resourceList core_model.ResourceList, revision int64) error
}
-type mappingSyncClient struct {
+type dubboSyncClient struct {
log logr.Logger
id string
- syncStream MappingSyncStream
+ syncStream DubboSyncStream
callbacks *Callbacks
}
-func NewMappingSyncClient(log logr.Logger, id string, syncStream MappingSyncStream, cb *Callbacks) MappingSyncClient {
- return &mappingSyncClient{
+func NewDubboSyncClient(log logr.Logger, id string, syncStream DubboSyncStream, cb *Callbacks) DubboSyncClient {
+ return &dubboSyncClient{
log: log,
id: id,
syncStream: syncStream,
@@ -59,11 +60,11 @@
}
}
-func (s *mappingSyncClient) ClientID() string {
+func (s *dubboSyncClient) ClientID() string {
return s.id
}
-func (s *mappingSyncClient) HandleReceive() error {
+func (s *dubboSyncClient) HandleReceive() error {
for {
received, err := s.syncStream.Recv()
if err != nil {
@@ -80,16 +81,22 @@
}
// callbacks
- err = s.callbacks.OnRequestReceived(received)
- if err != nil {
- s.log.Error(err, "error in OnRequestReceived")
- } else {
- s.log.Info("OnRequestReceived successed")
+ switch received.(type) {
+ case *mesh_proto.MappingSyncRequest:
+ err = s.callbacks.OnMappingSyncRequestReceived(received.(*mesh_proto.MappingSyncRequest))
+ if err != nil {
+ s.log.Error(err, "error in OnMappingSyncRequestReceived")
+ } else {
+ s.log.Info("OnMappingSyncRequestReceived successed")
+ }
+ case *mesh_proto.MetadataSyncRequest:
+ panic("unimplemented")
+ default:
+ return errors.New("unknown type request")
}
-
}
}
-func (s *mappingSyncClient) Send(mappingList *core_mesh.MappingResourceList, revision int64) error {
- return s.syncStream.Send(mappingList, revision)
+func (s *dubboSyncClient) Send(resourceList core_model.ResourceList, revision int64) error {
+ return s.syncStream.Send(resourceList, revision)
}
diff --git a/pkg/dubbo/components.go b/pkg/dubbo/components.go
index c61d61f..0fc70f9 100644
--- a/pkg/dubbo/components.go
+++ b/pkg/dubbo/components.go
@@ -63,6 +63,7 @@
// register MetadataService
metadata := dubbo_metadata.NewMetadataServe(
rt.AppContext(),
+ cfg,
dubboPusher,
rt.ResourceManager(),
rt.Transactions(),
diff --git a/pkg/dubbo/metadata/register_request.go b/pkg/dubbo/metadata/register_request.go
index 5dff680..df12e06 100644
--- a/pkg/dubbo/metadata/register_request.go
+++ b/pkg/dubbo/metadata/register_request.go
@@ -17,4 +17,37 @@
package metadata
-type RegisterRequest struct{}
+import "fmt"
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
+)
+
+type RegisterRequest struct {
+ ConfigsUpdated map[core_model.ResourceKey]*mesh_proto.MetaData
+}
+
+func (q *RegisterRequest) merge(req *RegisterRequest) *RegisterRequest {
+ if q == nil {
+ return req
+ }
+ for key, metaData := range req.ConfigsUpdated {
+ q.ConfigsUpdated[key] = metaData
+ }
+
+ return q
+}
+
+func configsUpdated(req *RegisterRequest) string {
+ configs := ""
+ for key := range req.ConfigsUpdated {
+ configs += key.Name + "." + key.Mesh
+ break
+ }
+ if len(req.ConfigsUpdated) > 1 {
+ more := fmt.Sprintf(" and %d more configs", len(req.ConfigsUpdated)-1)
+ configs += more
+ }
+ return configs
+}
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index 94d47d6..762a56d 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -19,13 +19,30 @@
import (
"context"
+ "io"
+ "strings"
+ "time"
+)
+
+import (
+ "github.com/google/uuid"
+
+ "github.com/pkg/errors"
+
+ "google.golang.org/grpc/codes"
+
+ "google.golang.org/grpc/status"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ "github.com/apache/dubbo-kubernetes/pkg/config/dubbo"
"github.com/apache/dubbo-kubernetes/pkg/core"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+ core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+ "github.com/apache/dubbo-kubernetes/pkg/dubbo/client"
"github.com/apache/dubbo-kubernetes/pkg/dubbo/pusher"
)
@@ -36,6 +53,7 @@
type MetadataServer struct {
mesh_proto.MetadataServiceServer
+ config dubbo.DubboConfig
queue chan *RegisterRequest
pusher pusher.Pusher
@@ -44,22 +62,28 @@
transactions core_store.Transactions
}
-func (s *MetadataServer) Start(stop <-chan struct{}) error {
+func (m *MetadataServer) Start(stop <-chan struct{}) error {
+ // we start debounce to prevent too many MetadataRegisterRequests, we aggregate metadata register information
+ go m.debounce(stop, m.register)
+
return nil
}
-func (s *MetadataServer) NeedLeaderElection() bool {
+func (m *MetadataServer) NeedLeaderElection() bool {
return false
}
func NewMetadataServe(
ctx context.Context,
+ config dubbo.DubboConfig,
pusher pusher.Pusher,
resourceManager manager.ResourceManager,
transactions core_store.Transactions,
) *MetadataServer {
return &MetadataServer{
+ config: config,
pusher: pusher,
+ queue: make(chan *RegisterRequest, queueSize),
ctx: ctx,
resourceManager: resourceManager,
transactions: transactions,
@@ -67,14 +91,285 @@
}
func (m *MetadataServer) MetadataRegister(ctx context.Context, req *mesh_proto.MetaDataRegisterRequest) (*mesh_proto.MetaDataRegisterResponse, error) {
+ mesh := core_model.DefaultMesh // todo: mesh
+ podName := req.GetPodName()
+ metadata := req.GetMetadata()
+ if metadata == nil {
+ return &mesh_proto.MetaDataRegisterResponse{
+ Success: false,
+ Message: "Metadata is nil",
+ }, nil
+ }
+
+ // MetaData name = podName.revision
+ name := podName + "." + metadata.Revision
+ registerReq := &RegisterRequest{ConfigsUpdated: map[core_model.ResourceKey]*mesh_proto.MetaData{}}
+ key := core_model.ResourceKey{
+ Mesh: mesh,
+ Name: name,
+ }
+ registerReq.ConfigsUpdated[key] = metadata
+
+ // push into queue to debounce, register Metadata Resource
+ m.queue <- registerReq
+
return &mesh_proto.MetaDataRegisterResponse{
- Success: false,
+ Success: true,
Message: "success",
}, nil
}
-func (m MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
- return nil
+func (m *MetadataServer) MetadataSync(stream mesh_proto.MetadataService_MetadataSyncServer) error {
+ mesh := core_model.DefaultMesh // todo: mesh
+ errChan := make(chan error)
+
+ clientID := uuid.NewString()
+ metadataSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MetaSyncRequest from data plane
+ metadataSyncClient := client.NewDubboSyncClient(
+ log.WithName("client"),
+ clientID,
+ metadataSyncStream,
+ &client.Callbacks{
+ OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
+ // when received request, invoke callback
+ m.pusher.InvokeCallback(
+ core_mesh.MetaDataType,
+ clientID,
+ request,
+ func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
+ req := rawRequest.(*mesh_proto.MetadataSyncRequest)
+ metadataList := resourceList.(*core_mesh.MetaDataResourceList)
+
+ // only response the target MetaData Resource by application name or revision
+ respMetadataList := &core_mesh.MetaDataResourceList{}
+ for _, item := range metadataList.Items {
+ // MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
+ if item.Spec != nil && strings.HasPrefix(item.Spec.App, req.ApplicationName) {
+ if req.Revision != "" {
+ // revision is not empty, response the Metadata with application name and target revision
+ if req.Revision == item.Spec.Revision {
+ _ = respMetadataList.AddItem(item)
+ }
+ } else {
+ // revision is empty, response the Metadata with target application name
+ _ = respMetadataList.AddItem(item)
+ }
+ }
+ }
+
+ return respMetadataList
+ },
+ )
+ return nil
+ },
+ })
+ go func() {
+ // Handle requests from client
+ err := metadataSyncClient.HandleReceive()
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
+ }()
+
+ m.pusher.AddCallback(
+ core_mesh.MetaDataType,
+ metadataSyncClient.ClientID(),
+ func(items pusher.PushedItems) {
+ resourceList := items.ResourceList()
+ revision := items.Revision()
+ metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
+ if !ok {
+ return
+ }
+
+ err := metadataSyncClient.Send(metadataList, revision)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ log.Info("DubboSyncClient finished gracefully")
+ errChan <- nil
+ return
+ }
+
+ log.Error(err, "send metadata sync response failed", "metadataList", metadataList, "revision", revision)
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
+ }
+ },
+ func(resourceList core_model.ResourceList) core_model.ResourceList {
+ if resourceList.GetItemType() != core_mesh.MetaDataType {
+ return nil
+ }
+
+ // only send Metadata which client subscribed
+ newResourceList := &core_mesh.MeshResourceList{}
+ for _, resource := range resourceList.GetItems() {
+ expected := false
+ metaData := resource.(*core_mesh.MetaDataResource)
+ for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
+ // MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
+ if strings.HasPrefix(metaData.Spec.GetApp(), applicationName) && mesh == resource.GetMeta().GetMesh() {
+ expected = true
+ break
+ }
+ }
+
+ if expected {
+ // find
+ _ = newResourceList.AddItem(resource)
+ }
+ }
+
+ return newResourceList
+ },
+ )
+
+ // in the end, remove callback of this client
+ defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())
+
+ for {
+ select {
+ case err := <-errChan:
+ if err == nil {
+ log.Info("MetadataSync finished gracefully")
+ return nil
+ }
+
+ log.Error(err, "MetadataSync finished with an error")
+ return status.Error(codes.Internal, err.Error())
+ }
+ }
}
-func (s *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {}
+func (m *MetadataServer) debounce(stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) {
+ ch := m.queue
+ var timeChan <-chan time.Time
+ var startDebounce time.Time
+ var lastConfigUpdateTime time.Time
+
+ pushCounter := 0
+ debouncedEvents := 0
+
+ var req *RegisterRequest
+
+ free := true
+ freeCh := make(chan struct{}, 1)
+
+ push := func(req *RegisterRequest) {
+ pushFn(req)
+ freeCh <- struct{}{}
+ }
+
+ pushWorker := func() {
+ eventDelay := time.Since(startDebounce)
+ quietTime := time.Since(lastConfigUpdateTime)
+ if eventDelay >= m.config.Debounce.Max || quietTime >= m.config.Debounce.After {
+ if req != nil {
+ pushCounter++
+
+ if req.ConfigsUpdated != nil {
+ log.Info("debounce stable[%d] %d for config %s: %v since last change, %v since last push",
+ pushCounter, debouncedEvents, configsUpdated(req),
+ quietTime, eventDelay)
+ }
+ free = false
+ go push(req)
+ req = nil
+ debouncedEvents = 0
+ }
+ } else {
+ timeChan = time.After(m.config.Debounce.After - quietTime)
+ }
+ }
+
+ for {
+ select {
+ case <-freeCh:
+ free = true
+ pushWorker()
+ case r := <-ch:
+ if !m.config.Debounce.Enable {
+ go push(r)
+ req = nil
+ continue
+ }
+
+ lastConfigUpdateTime = time.Now()
+ if debouncedEvents == 0 {
+ timeChan = time.After(200 * time.Millisecond)
+ startDebounce = lastConfigUpdateTime
+ }
+ debouncedEvents++
+
+ req = req.merge(r)
+ case <-timeChan:
+ if free {
+ pushWorker()
+ }
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+func (m *MetadataServer) register(req *RegisterRequest) {
+ for key, metadata := range req.ConfigsUpdated {
+ for i := 0; i < 3; i++ {
+ if err := m.tryRegister(key, metadata); err != nil {
+ log.Error(err, "register failed", "key", key)
+ } else {
+ break
+ }
+ }
+ }
+}
+
+func (m *MetadataServer) tryRegister(key core_model.ResourceKey, newMetadata *mesh_proto.MetaData) error {
+ err := core_store.InTx(m.ctx, m.transactions, func(ctx context.Context) error {
+
+ // get Metadata Resource first,
+ // if Metadata is not found, create it,
+ // else update it.
+ metadata := core_mesh.NewMetaDataResource()
+ err := m.resourceManager.Get(m.ctx, metadata, core_store.GetBy(key))
+ if err != nil && !core_store.IsResourceNotFound(err) {
+ log.Error(err, "get Metadata Resource")
+ return err
+ }
+
+ if core_store.IsResourceNotFound(err) {
+ // create if not found
+ metadata.Spec = newMetadata
+ err = m.resourceManager.Create(m.ctx, metadata, core_store.CreateBy(key), core_store.CreatedAt(time.Now()))
+ if err != nil {
+ log.Error(err, "create Metadata Resource failed")
+ return err
+ }
+
+ log.Info("create Metadata Resource success", "key", key, "metadata", newMetadata)
+ return nil
+ } else {
+ // if found, update it
+ metadata.Spec = newMetadata
+
+ err = m.resourceManager.Update(m.ctx, metadata, core_store.ModifiedAt(time.Now()))
+ if err != nil {
+ log.Error(err, "update Metadata Resource failed")
+ return err
+ }
+
+ log.Info("update Metadata Resource success", "key", key, "metadata", newMetadata)
+ return nil
+ }
+ })
+ if err != nil {
+ log.Error(err, "transactions failed")
+ return err
+ }
+
+ return nil
+}
diff --git a/pkg/dubbo/pusher/interface.go b/pkg/dubbo/pusher/interface.go
index 1b1a6b2..01427b2 100644
--- a/pkg/dubbo/pusher/interface.go
+++ b/pkg/dubbo/pusher/interface.go
@@ -32,5 +32,5 @@
RemoveCallback(resourceType core_model.ResourceType, id string)
// InvokeCallback invoke a target callback
// for example, for a push request from client, invoke this function to push resource.
- InvokeCallback(resourceType core_model.ResourceType, id string)
+ InvokeCallback(resourceType core_model.ResourceType, id string, request interface{}, requestFilter ResourceRequestFilter)
}
diff --git a/pkg/dubbo/pusher/pusher.go b/pkg/dubbo/pusher/pusher.go
index f3aa079..9604dc2 100644
--- a/pkg/dubbo/pusher/pusher.go
+++ b/pkg/dubbo/pusher/pusher.go
@@ -54,8 +54,10 @@
resourceChangedEventListeners map[core_model.ResourceType]events.Listener
eventsChannel chan *changedEvent
requestChannel chan struct {
- resourceType core_model.ResourceType
- id string
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
}
resourceChangedCallbacks *ResourceChangedCallbacks
@@ -76,8 +78,10 @@
resourceChangedEventListeners: make(map[core_model.ResourceType]events.Listener),
eventsChannel: make(chan *changedEvent, eventsChannelSize),
requestChannel: make(chan struct {
- resourceType core_model.ResourceType
- id string
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
}, requestChannelSize),
resourceChangedCallbacks: NewResourceChangedCallbacks(),
@@ -190,8 +194,13 @@
continue
}
+ resourceList := lastedPushed
+ if req.requestFilter != nil {
+ resourceList = req.requestFilter(req.request, lastedPushed)
+ }
+
cb.Invoke(PushedItems{
- resourceList: lastedPushed,
+ resourceList: resourceList,
revision: revision,
})
case <-fullResyncTicker.C:
@@ -226,9 +235,16 @@
p.resourceChangedCallbacks.RemoveCallBack(resourceType, id)
}
-func (p *pusher) InvokeCallback(resourceType core_model.ResourceType, id string) {
+func (p *pusher) InvokeCallback(resourceType core_model.ResourceType, id string, request interface{}, requestFilter ResourceRequestFilter) {
p.requestChannel <- struct {
- resourceType core_model.ResourceType
- id string
- }{resourceType: resourceType, id: id}
+ request interface{}
+ requestFilter ResourceRequestFilter
+ resourceType core_model.ResourceType
+ id string
+ }{
+ request: request,
+ requestFilter: requestFilter,
+ resourceType: resourceType,
+ id: id,
+ }
}
diff --git a/pkg/dubbo/pusher/resource_changed_callbacks.go b/pkg/dubbo/pusher/resource_changed_callbacks.go
index a185ebd..caf0675 100644
--- a/pkg/dubbo/pusher/resource_changed_callbacks.go
+++ b/pkg/dubbo/pusher/resource_changed_callbacks.go
@@ -28,6 +28,7 @@
type (
ResourceChangedCallbackFn func(items PushedItems)
ResourceChangedEventFilter func(resourceList core_model.ResourceList) core_model.ResourceList
+ ResourceRequestFilter func(request interface{}, resourceList core_model.ResourceList) core_model.ResourceList
)
type ResourceChangedCallback struct {
diff --git a/pkg/dubbo/servicemapping/server.go b/pkg/dubbo/servicemapping/server.go
index addd354..bc802f3 100644
--- a/pkg/dubbo/servicemapping/server.go
+++ b/pkg/dubbo/servicemapping/server.go
@@ -126,16 +126,34 @@
errChan := make(chan error)
clientID := uuid.NewString()
- mappingSyncStream := client.NewMappingSyncStream(stream)
- // MappingSyncClient is to handle MappingSyncRequest from data plane
- mappingSyncClient := client.NewMappingSyncClient(
+ mappingSyncStream := client.NewDubboSyncStream(stream)
+ // DubboSyncClient is to handle MappingSyncRequest from data plane
+ mappingSyncClient := client.NewDubboSyncClient(
log.WithName("client"),
clientID,
mappingSyncStream,
&client.Callbacks{
- OnRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
+ OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
// when received request, invoke callback
- s.pusher.InvokeCallback(core_mesh.MappingType, clientID)
+ s.pusher.InvokeCallback(
+ core_mesh.MappingType,
+ clientID,
+ request,
+ func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
+ req := rawRequest.(*mesh_proto.MappingSyncRequest)
+ mappingList := resourceList.(*core_mesh.MappingResourceList)
+
+ // only response the target Mapping Resource by interface name
+ respMappingList := &core_mesh.MappingResourceList{}
+ for _, item := range mappingList.Items {
+ if item.Spec != nil && req.InterfaceName == item.Spec.InterfaceName {
+ _ = respMappingList.AddItem(item)
+ }
+ }
+
+ return respMappingList
+ },
+ )
return nil
},
})
@@ -143,13 +161,13 @@
// Handle requests from client
err := mappingSyncClient.HandleReceive()
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
- log.Error(err, "MappingSyncClient finished with an error")
- errChan <- errors.Wrap(err, "MappingSyncClient finished with an error")
+ log.Error(err, "DubboSyncClient finished with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
}()
s.pusher.AddCallback(
@@ -166,13 +184,13 @@
err := mappingSyncClient.Send(mappingList, revision)
if err != nil {
if errors.Is(err, io.EOF) {
- log.Info("MappingSyncClient finished gracefully")
+ log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
- errChan <- errors.Wrap(err, "MappingSyncClient send with an error")
+ errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
}
},
func(resourceList core_model.ResourceList) core_model.ResourceList {
diff --git a/pkg/plugins/resources/k8s/store.go b/pkg/plugins/resources/k8s/store.go
index 4d09b54..c6c8e0a 100644
--- a/pkg/plugins/resources/k8s/store.go
+++ b/pkg/plugins/resources/k8s/store.go
@@ -19,6 +19,7 @@
import (
"context"
+ "github.com/apache/dubbo-kubernetes/pkg/core/logger"
"strings"
"time"
)
@@ -99,7 +100,9 @@
if err := s.Client.Create(ctx, obj); err != nil {
if kube_apierrs.IsAlreadyExists(err) {
- return store.ErrorResourceAlreadyExists(r.Descriptor().Name, opts.Name, opts.Mesh)
+ // 如果资源已经存在了就直接返回空即可
+ logger.Sugar().Warn("资源已经存在了")
+ return nil
}
return errors.Wrap(err, "failed to create k8s resource")
}
diff --git a/pkg/plugins/resources/traditional/store.go b/pkg/plugins/resources/traditional/store.go
index 56cae11..721d87e 100644
--- a/pkg/plugins/resources/traditional/store.go
+++ b/pkg/plugins/resources/traditional/store.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "github.com/dubbogo/go-zookeeper/zk"
"sync"
)
@@ -47,7 +48,6 @@
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/events"
- "github.com/apache/dubbo-kubernetes/pkg/plugins/util/ccache"
)
const (
@@ -555,9 +555,7 @@
switch resource.Descriptor().Name {
case mesh.DataplaneType:
- app := opts.Labels[mesh_proto.Application]
- revision := opts.Labels[mesh_proto.Revision]
- key := ccache.GetDataplaneKey(app, revision)
+ key := opts.Name
value, ok := c.dCache.Load(key)
if !ok {
return nil
@@ -649,8 +647,12 @@
key := opts.Name
set, err := c.metadataReport.GetServiceAppMapping(key, mappingGroup, nil)
if err != nil {
+ if errors.Is(err, zk.ErrNoNode) {
+ return nil
+ }
return err
}
+
meta := &resourceMetaObject{
Name: opts.Name,
Mesh: opts.Mesh,
@@ -669,9 +671,9 @@
Mesh: opts.Mesh,
})
case mesh.MetaDataType:
- labels := opts.Labels
- revision := labels[mesh_proto.Revision]
- app := labels[mesh_proto.Application]
+ name := opts.Name
+ // 拆分name得到revision和app
+ app, revision := splitAppAndRevision(name)
if revision == "" {
children, err := c.regClient.GetChildren(getMetadataPath(app))
if err != nil {
diff --git a/pkg/plugins/resources/traditional/path_util.go b/pkg/plugins/resources/traditional/utils.go
similarity index 90%
rename from pkg/plugins/resources/traditional/path_util.go
rename to pkg/plugins/resources/traditional/utils.go
index 4f9cb08..999d240 100644
--- a/pkg/plugins/resources/traditional/path_util.go
+++ b/pkg/plugins/resources/traditional/utils.go
@@ -19,6 +19,7 @@
import (
"fmt"
+ "strings"
)
func GenerateCpGroupPath(resourceName string, name string) string {
@@ -66,3 +67,10 @@
}
return rootDir
}
+
+func splitAppAndRevision(name string) (app string, revision string) {
+ split := strings.Split(name, "-")
+ n := len(split)
+ app = strings.Replace(name, "-"+split[n-1], "", -1)
+ return app, split[n-1]
+}
diff --git a/pkg/plugins/resources/traditional/utils_test.go b/pkg/plugins/resources/traditional/utils_test.go
new file mode 100644
index 0000000..c5593ca
--- /dev/null
+++ b/pkg/plugins/resources/traditional/utils_test.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package traditional
+
+import "testing"
+
+func TestSplitAppAndRevision(t *testing.T) {
+ name := "dubbo-springboot-demo-lixinyang-bdc0958191bba7a0f050a32709ee1111"
+ app, revision := splitAppAndRevision(name)
+ if app != "dubbo-springboot-demo-lixinyang" && revision != "bdc0958191bba7a0f050a32709ee1111" {
+ t.Error("解析错误")
+ }
+}
diff --git a/pkg/plugins/runtime/k8s/controllers/pod_controller.go b/pkg/plugins/runtime/k8s/controllers/pod_controller.go
index 66407f6..a838de1 100644
--- a/pkg/plugins/runtime/k8s/controllers/pod_controller.go
+++ b/pkg/plugins/runtime/k8s/controllers/pod_controller.go
@@ -117,6 +117,14 @@
}
func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
+ ns := kube_core.Namespace{}
+ if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
+ return errors.Wrap(err, "unable to get Namespace for Pod")
+ }
+ if ns.Status.Phase == kube_core.NamespaceTerminating {
+ r.Log.V(1).Info("namespace is terminating. Ignoring reconciliation")
+ return nil
+ }
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
@@ -131,11 +139,6 @@
return r.deleteObjectIfExist(ctx, dp, "pod was evicted", log)
}
- ns := kube_core.Namespace{}
- if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
- return errors.Wrap(err, "unable to get Namespace for Pod")
- }
-
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
diff --git a/pkg/test/component.go b/pkg/test/component.go
index d4317cc..466c6f5 100644
--- a/pkg/test/component.go
+++ b/pkg/test/component.go
@@ -19,65 +19,84 @@
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/config/core"
+ "github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
+ "strings"
+ "time"
)
+var testServerLog = core.Log.WithName("test")
+
func Setup(rt core_runtime.Runtime) error {
- if rt.GetDeployMode() == core.KubernetesMode {
- return nil
+ testServer := NewTestServer(rt)
+ if err := rt.Add(testServer); err != nil {
+ testServerLog.Error(err, "fail to start the test server")
}
+ return nil
+}
+
+type TestServer struct {
+ rt core_runtime.Runtime
+}
+
+func NewTestServer(rt core_runtime.Runtime) *TestServer {
+ return &TestServer{rt: rt}
+}
+
+func (t *TestServer) Start(stop <-chan struct{}) error {
// 测试mapping资源
- if err := testMapping(rt); err != nil {
+ if err := testMapping(t.rt); err != nil {
return err
}
// 测试metadata资源
- if err := testMetadata(rt); err != nil {
+ if err := testMetadata(t.rt); err != nil {
return err
}
+
+ time.Sleep(3 * time.Second)
// 测试dataplane资源
- if err := testDataplane(rt); err != nil {
+ if err := testDataplane(t.rt); err != nil {
return err
}
+
return nil
}
+func (a *TestServer) NeedLeaderElection() bool {
+ return false
+}
+
// dataplane资源只有get, list接口, 其余均不支持
func testDataplane(rt core_runtime.Runtime) error {
manager := rt.ResourceManager()
dataplaneResource := mesh.NewDataplaneResource()
- // get
- if err := manager.Get(rt.AppContext(), dataplaneResource, store.GetByApplication("dubbo-springboot-demo-provider"), store.GetByRevision("bdc0958191bba7a0f050a32709ee1262")); err != nil {
- return err
- }
// list
dataplaneList := &mesh.DataplaneResourceList{}
if err := manager.List(rt.AppContext(), dataplaneList); err != nil {
return err
}
+
+ if len(dataplaneList.Items) > 0 {
+ // get
+ if err := manager.Get(rt.AppContext(), dataplaneResource,
+ store.GetBy(core_model.ResourceKey{
+ Name: dataplaneList.Items[0].Meta.GetName(),
+ Mesh: "default",
+ })); err != nil {
+ return err
+ }
+ }
+
return nil
}
// metadata资源没有删除能力
func testMetadata(rt core_runtime.Runtime) error {
manager := rt.ResourceManager()
- metadata1 := mesh.NewMetaDataResource()
- // get
- if err := manager.Get(rt.AppContext(), metadata1, store.GetByApplication("dubbo-springboot-demo-provider")); err != nil {
- return err
- }
-
- // list
- metadataList := &mesh.MetaDataResourceList{}
-
- if err := manager.List(rt.AppContext(), metadataList); err != nil {
- return err
- }
-
// create
metadata2 := mesh.NewMetaDataResource()
err := metadata2.SetSpec(&mesh_proto.MetaData{
@@ -92,12 +111,32 @@
if err != nil {
return err
}
- if err := manager.Create(rt.AppContext(), metadata2); err != nil {
+ if err := manager.Create(rt.AppContext(), metadata2, store.CreateBy(core_model.ResourceKey{
+ Name: metadata2.Spec.App + "-" + metadata2.Spec.Revision,
+ Mesh: "default",
+ })); err != nil {
+ return err
+ }
+
+ metadata1 := mesh.NewMetaDataResource()
+ // get
+ if err := manager.Get(rt.AppContext(), metadata1, store.GetBy(core_model.ResourceKey{
+ Name: metadata2.Spec.App + "-" + metadata2.Spec.Revision,
+ Mesh: "default",
+ })); err != nil {
+ return err
+ }
+
+ // list
+ metadataList := &mesh.MetaDataResourceList{}
+
+ if err := manager.List(rt.AppContext(), metadataList); err != nil {
return err
}
// update
metadata3 := mesh.NewMetaDataResource()
+ metadata3.SetMeta(metadata1.GetMeta())
err = metadata3.SetSpec(&mesh_proto.MetaData{
App: "dubbo-springboot-demo-lixinyang",
Revision: "bdc0958191bba7a0f050a32709ee1111",
@@ -119,14 +158,6 @@
// mapping资源没有删除功能
func testMapping(rt core_runtime.Runtime) error {
manager := rt.ResourceManager()
- // mapping test
- mapping1 := mesh.NewMappingResource()
- // get
- if err := manager.Get(rt.AppContext(), mapping1, store.GetBy(core_model.ResourceKey{
- Name: "org.apache.dubbo.springboot.demo.DemoService",
- })); err != nil {
- return err
- }
mapping2 := mesh.NewMappingResource()
err := mapping2.SetSpec(&mesh_proto.Mapping{
@@ -141,7 +172,20 @@
}
// create
- if err := manager.Create(rt.AppContext(), mapping2); err != nil {
+ if err := manager.Create(rt.AppContext(), mapping2, store.CreateBy(core_model.ResourceKey{
+ Name: strings.ToLower(strings.ReplaceAll(mapping2.Spec.InterfaceName, ".", "-")),
+ Mesh: "default",
+ })); err != nil {
+ return err
+ }
+
+ // mapping test
+ mapping1 := mesh.NewMappingResource()
+ // get
+ if err := manager.Get(rt.AppContext(), mapping1, store.GetBy(core_model.ResourceKey{
+ Name: strings.ToLower(strings.ReplaceAll("org.apache.dubbo.springboot.demo.DemoService1", ".", "-")),
+ Mesh: "default",
+ })); err != nil {
return err
}
@@ -153,6 +197,7 @@
}
mapping3 := mesh.NewMappingResource()
+ mapping3.SetMeta(mapping1.GetMeta())
err = mapping3.SetSpec(&mesh_proto.Mapping{
Zone: "zone2",
InterfaceName: "org.apache.dubbo.springboot.demo.DemoService1",
diff --git a/test/app/consumer/deployment.yaml b/test/app/consumer/deployment.yaml
index ec83ebf..0cf965c 100644
--- a/test/app/consumer/deployment.yaml
+++ b/test/app/consumer/deployment.yaml
@@ -19,7 +19,7 @@
name: dubbo-samples-apiserver-consumer
namespace: dubbo-system
spec:
- replicas: 1
+ replicas: 2
selector:
matchLabels:
app: dubbo-samples-apiserver-consumer