[discovery] add aggregate and cert controller logic (#792)
diff --git a/dubboctl/pkg/validate/validate.go b/dubboctl/pkg/validate/validate.go
index dbcc470..debd997 100644
--- a/dubboctl/pkg/validate/validate.go
+++ b/dubboctl/pkg/validate/validate.go
@@ -25,7 +25,7 @@
operator "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
operatorvalidate "github.com/apache/dubbo-kubernetes/operator/pkg/apis/validation"
"github.com/apache/dubbo-kubernetes/pkg/config/validation"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"
"io"
diff --git a/go.mod b/go.mod
index 748a3e5..2a97428 100644
--- a/go.mod
+++ b/go.mod
@@ -40,7 +40,9 @@
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.7.0
github.com/google/go-containerregistry v0.20.6
+ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
+ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/heroku/color v0.0.6
github.com/moby/term v0.5.2
@@ -112,10 +114,12 @@
github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.0.0-20230522190001-adf1bafd791a // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/buildpacks/imgutil v0.0.0-20230626185301-726f02e4225c // indirect
github.com/buildpacks/lifecycle v0.17.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chrismellard/docker-credential-acr-env v0.0.0-20230304212654-82a0ddb27589 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
@@ -203,6 +207,8 @@
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240409071808-615f978279ca // indirect
github.com/prometheus/client_golang v1.23.0 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/rivo/tview v0.0.0-20220307222120-9994674d60a8 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
diff --git a/go.sum b/go.sum
index beb2eaa..bfe1770 100644
--- a/go.sum
+++ b/go.sum
@@ -186,6 +186,7 @@
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU=
github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/containerd/containerd v1.7.27 h1:yFyEyojddO3MIGVER2xJLWoCIn+Up4GaHFquP7hsFII=
@@ -244,7 +245,9 @@
github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane/envoy v1.32.5-0.20250627145903-197b96a9c7f8 h1:/F9jLyfDeNr4iZxyibtKlZxCDqCFEhoYiLdc9VOZT2E=
github.com/envoyproxy/go-control-plane/envoy v1.32.5-0.20250627145903-197b96a9c7f8/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -287,6 +290,7 @@
github.com/go-git/go-git/v5 v5.13.1/go.mod h1:qryJB4cSBoq3FRoBRf5A77joojuBcmPJ0qu3XXXVixc=
github.com/go-jose/go-jose/v4 v4.0.5 h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE=
github.com/go-jose/go-jose/v4 v4.0.5/go.mod h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
@@ -301,6 +305,7 @@
github.com/go-openapi/swag v0.23.1/go.mod h1:STZs8TbRvEQQKUA+JZNAm3EWlgaOBGpyFDqQnDHMef0=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
@@ -322,6 +327,7 @@
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@@ -373,8 +379,12 @@
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -507,6 +517,7 @@
github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.11.1 h1:nHFvthhM0qY8/m+vfhJylliSshm8G1jJ2jDMcgULaH8=
github.com/opencontainers/selinux v1.11.1/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
+github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/ory/viper v1.7.5 h1:+xVdq7SU3e1vNaCsk/ixsfxE4zylk1TJUiJrY647jUE=
github.com/ory/viper v1.7.5/go.mod h1:ypOuyJmEUb3oENywQZRgeAMwqgOyDqwboO1tj3DjTaM=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -563,6 +574,7 @@
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
@@ -667,12 +679,15 @@
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 h1:Ss6D3hLXTM0KobyBYEAygXzFfGcjnmfEJOBgSbemCtg=
go.starlark.net v0.0.0-20230302034142-4b1e35fe2254/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
@@ -733,6 +748,7 @@
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -794,6 +810,7 @@
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20240528184218-531527333157 h1:u7WMYrIrVvs0TF5yaKwKNbcJyySYf+HAIFXxWltJOXE=
google.golang.org/genproto v0.0.0-20240528184218-531527333157/go.mod h1:ubQlAQnzejB8uZzszhrTCU2Fyp6Vi7ZE5nn0c3W8+qQ=
@@ -803,7 +820,9 @@
google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
diff --git a/operator/cmd/cluster/manifest.go b/operator/cmd/cluster/manifest.go
index 4bd44f0..df5ec60 100644
--- a/operator/cmd/cluster/manifest.go
+++ b/operator/cmd/cluster/manifest.go
@@ -25,7 +25,7 @@
"github.com/apache/dubbo-kubernetes/operator/pkg/render"
"github.com/apache/dubbo-kubernetes/operator/pkg/util/clog"
"github.com/apache/dubbo-kubernetes/pkg/kube"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/spf13/cobra"
"sigs.k8s.io/yaml"
"strings"
diff --git a/operator/pkg/helm/helm.go b/operator/pkg/helm/helm.go
index 60429cb..d9ab3f9 100644
--- a/operator/pkg/helm/helm.go
+++ b/operator/pkg/helm/helm.go
@@ -23,7 +23,7 @@
"github.com/apache/dubbo-kubernetes/operator/pkg/manifest"
"github.com/apache/dubbo-kubernetes/operator/pkg/util"
"github.com/apache/dubbo-kubernetes/operator/pkg/values"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
diff --git a/operator/pkg/install/installer.go b/operator/pkg/install/installer.go
index df6450f..9b74c0e 100644
--- a/operator/pkg/install/installer.go
+++ b/operator/pkg/install/installer.go
@@ -29,8 +29,8 @@
"github.com/apache/dubbo-kubernetes/operator/pkg/util/progress"
"github.com/apache/dubbo-kubernetes/operator/pkg/values"
"github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/hashicorp/go-multierror"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/config/constants/constants.go b/pkg/config/constants/constants.go
index e9bf56f..d558faf 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -36,4 +36,14 @@
CertProviderCustom = "custom"
ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
+
+ CertProviderKubernetes = "kubernetes"
+
+ SailWellKnownDNSCertPath = "./var/run/secrets/dubbod/tls/"
+ SailWellKnownDNSCaCertPath = "./var/run/secrets/dubbod/ca/"
+
+ DefaultSailTLSCert = SailWellKnownDNSCertPath + "tls.crt"
+ DefaultSailTLSKey = SailWellKnownDNSCertPath + "tls.key"
+ DefaultSailTLSCaCert = SailWellKnownDNSCaCertPath + "root-cert.pem"
+ DefaultSailTLSCaCertAlternatePath = SailWellKnownDNSCertPath + "ca.crt"
)
diff --git a/pkg/config/labels/instance.go b/pkg/config/labels/instance.go
index 845ebbe..6b8fdec 100644
--- a/pkg/config/labels/instance.go
+++ b/pkg/config/labels/instance.go
@@ -19,13 +19,13 @@
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"regexp"
"strings"
"github.com/hashicorp/go-multierror"
"github.com/apache/dubbo-kubernetes/pkg/maps"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
)
const (
diff --git a/pkg/config/model.go b/pkg/config/model.go
index 76e8265..204feb7 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -21,6 +21,7 @@
"bytes"
"encoding/json"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"reflect"
"time"
@@ -42,7 +43,6 @@
"github.com/apache/dubbo-kubernetes/pkg/util/gogoprotomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
"istio.io/api/label"
)
diff --git a/pkg/config/protocol/instance.go b/pkg/config/protocol/instance.go
new file mode 100644
index 0000000..3748f91
--- /dev/null
+++ b/pkg/config/protocol/instance.go
@@ -0,0 +1,146 @@
+package protocol
+
+import "strings"
+
+// Instance defines network protocols for ports
+type Instance string
+
+func (i Instance) String() string {
+ return string(i)
+}
+
+const (
+ // GRPC declares that the port carries gRPC traffic.
+ GRPC Instance = "GRPC"
+ // GRPCWeb declares that the port carries gRPC traffic.
+ GRPCWeb Instance = "GRPC-Web"
+ // HTTP declares that the port carries HTTP/1.1 traffic.
+ // Note that HTTP/1.0 or earlier may not be supported by the proxy.
+ HTTP Instance = "HTTP"
+ // HTTP_PROXY declares that the port is a generic outbound proxy port.
+ // Note that this is currently applicable only for defining sidecar egress listeners.
+ // nolint
+ HTTP_PROXY Instance = "HTTP_PROXY"
+ // HTTP2 declares that the port carries HTTP/2 traffic.
+ HTTP2 Instance = "HTTP2"
+ // HTTPS declares that the port carries HTTPS traffic.
+ HTTPS Instance = "HTTPS"
+ // TCP declares the port uses TCP.
+ // This is the default protocol for a service port.
+ TCP Instance = "TCP"
+ // TLS declares that the port carries TLS traffic.
+ // TLS traffic is assumed to contain SNI as part of the handshake.
+ TLS Instance = "TLS"
+ // UDP declares that the port uses UDP.
+ // Note that UDP protocol is not currently supported by the proxy.
+ UDP Instance = "UDP"
+ // Unsupported - value to signify that the protocol is unsupported.
+ Unsupported Instance = "UnsupportedProtocol"
+)
+
+// Parse from string ignoring case
+func Parse(s string) Instance {
+ switch strings.ToLower(s) {
+ case "tcp":
+ return TCP
+ case "udp":
+ return UDP
+ case "grpc":
+ return GRPC
+ case "grpc-web":
+ return GRPCWeb
+ case "http":
+ return HTTP
+ case "http_proxy":
+ return HTTP_PROXY
+ case "http2":
+ return HTTP2
+ case "https":
+ return HTTPS
+ case "tls":
+ return TLS
+ }
+
+ return Unsupported
+}
+
+// IsHTTP2 is true for protocols that use HTTP/2 as transport protocol
+func (i Instance) IsHTTP2() bool {
+ switch i {
+ case HTTP2, GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsHTTPOrSniffed is true for protocols that use HTTP as transport protocol, or *can* use it if sniffed to be HTTP
+func (i Instance) IsHTTPOrSniffed() bool {
+ return i.IsHTTP() || i.IsUnsupported()
+}
+
+// IsHTTP is true for protocols that use HTTP as transport protocol
+func (i Instance) IsHTTP() bool {
+ switch i {
+ case HTTP, HTTP2, HTTP_PROXY, GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsTCP is true for protocols that use TCP as transport protocol
+func (i Instance) IsTCP() bool {
+ switch i {
+ case TCP, HTTPS, TLS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsTLS is true for protocols on top of TLS (e.g. HTTPS)
+func (i Instance) IsTLS() bool {
+ switch i {
+ case HTTPS, TLS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsHTTPS is true if protocol is HTTPS
+func (i Instance) IsHTTPS() bool {
+ switch i {
+ case HTTPS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsGRPC is true for GRPC protocols.
+func (i Instance) IsGRPC() bool {
+ switch i {
+ case GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+func (i Instance) IsUnsupported() bool {
+ return i == Unsupported
+}
+
+// AfterTLSTermination returns the protocol that will be used if TLS is terminated on the current protocol.
+func (i Instance) AfterTLSTermination() Instance {
+ switch i {
+ case HTTPS:
+ return HTTP
+ case TLS:
+ return TCP
+ default:
+ return i
+ }
+}
diff --git a/pkg/config/schema/collection/schemas.go b/pkg/config/schema/collection/schemas.go
index bbf9e61..d5cadbf 100644
--- a/pkg/config/schema/collection/schemas.go
+++ b/pkg/config/schema/collection/schemas.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-multierror"
@@ -10,7 +11,6 @@
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
)
// Schemas contains metadata about configuration resources.
diff --git a/pkg/config/schema/gvk/resources.gen.go b/pkg/config/schema/gvk/resources.go
similarity index 83%
rename from pkg/config/schema/gvk/resources.gen.go
rename to pkg/config/schema/gvk/resources.go
index 08d09b9..1e5d9e4 100644
--- a/pkg/config/schema/gvk/resources.gen.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -18,7 +18,7 @@
package gvk
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@@ -37,6 +37,9 @@
Service = config.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}
ServiceAccount = config.GroupVersionKind{Group: "", Version: "v1", Kind: "ServiceAccount"}
MeshConfig = config.GroupVersionKind{Group: "", Version: "v1alpha1", Kind: "MeshConfig"}
+ RequestAuthentication = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
+ PeerAuthentication = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "PeerAuthentication"}
+ AuthorizationPolicy = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
)
func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -67,6 +70,12 @@
return gvr.Job, true
case MeshConfig:
return gvr.MeshConfig, true
+ case RequestAuthentication:
+ return gvr.RequestAuthentication, true
+ case PeerAuthentication:
+ return gvr.PeerAuthentication, true
+ case AuthorizationPolicy:
+ return gvr.AuthorizationPolicy, true
}
return schema.GroupVersionResource{}, false
}
@@ -92,6 +101,10 @@
return DaemonSet, true
case gvr.Job:
return Job, true
+ case gvr.PeerAuthentication:
+ return PeerAuthentication, true
+ case gvr.RequestAuthentication:
+ return RequestAuthentication, true
}
return config.GroupVersionKind{}, false
}
diff --git a/pkg/config/schema/gvr/resource.gen.go b/pkg/config/schema/gvr/resources.go
similarity index 83%
rename from pkg/config/schema/gvr/resource.gen.go
rename to pkg/config/schema/gvr/resources.go
index 1fd1e2e..a257a6a 100644
--- a/pkg/config/schema/gvr/resource.gen.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -35,6 +35,9 @@
Service = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
ServiceAccount = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "serviceaccounts"}
MeshConfig = schema.GroupVersionResource{Group: "", Version: "v1alpha1", Resource: "meshconfigs"}
+ RequestAuthentication = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "requestauthentications"}
+ PeerAuthentication = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "peerauthentications"}
+ AuthorizationPolicy = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
)
func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -55,6 +58,12 @@
return false
case ServiceAccount:
return false
+ case RequestAuthentication:
+ return false
+ case PeerAuthentication:
+ return false
+ case AuthorizationPolicy:
+ return false
}
return false
}
diff --git a/pkg/config/schema/kind/kind.go b/pkg/config/schema/kind/kind.go
new file mode 100644
index 0000000..6c29e6e
--- /dev/null
+++ b/pkg/config/schema/kind/kind.go
@@ -0,0 +1,3 @@
+package kind
+
+type Kind uint8
diff --git a/pkg/config/schema/kind/resource.go b/pkg/config/schema/kind/resource.go
new file mode 100644
index 0000000..fdbb42a
--- /dev/null
+++ b/pkg/config/schema/kind/resource.go
@@ -0,0 +1,60 @@
+package kind
+
+const (
+ Unknown Kind = iota
+ AuthorizationPolicy
+ CustomResourceDefinition
+ DestinationRule
+ MeshConfig
+ MeshNetworks
+ MutatingWebhookConfiguration
+ Namespace
+ PeerAuthentication
+ Pod
+ RequestAuthentication
+ Secret
+ Service
+ ServiceAccount
+ StatefulSet
+ ValidatingWebhookConfiguration
+ VirtualService
+)
+
+func (k Kind) String() string {
+ switch k {
+ case AuthorizationPolicy:
+ return "AuthorizationPolicy"
+ case CustomResourceDefinition:
+ return "CustomResourceDefinition"
+ case DestinationRule:
+ return "DestinationRule"
+ case MeshConfig:
+ return "MeshConfig"
+ case MeshNetworks:
+ return "MeshNetworks"
+ case MutatingWebhookConfiguration:
+ return "MutatingWebhookConfiguration"
+ case Namespace:
+ return "Namespace"
+ case PeerAuthentication:
+ return "PeerAuthentication"
+ case Pod:
+ return "Pod"
+ case RequestAuthentication:
+ return "RequestAuthentication"
+ case Secret:
+ return "Secret"
+ case Service:
+ return "Service"
+ case ServiceAccount:
+ return "ServiceAccount"
+ case StatefulSet:
+ return "StatefulSet"
+ case ValidatingWebhookConfiguration:
+ return "ValidatingWebhookConfiguration"
+ case VirtualService:
+ return "VirtualService"
+ default:
+ return "Unknown"
+ }
+}
diff --git a/pkg/config/schema/kubetypes/common.go b/pkg/config/schema/kubetypes/common.go
index e091b71..bff043c 100644
--- a/pkg/config/schema/kubetypes/common.go
+++ b/pkg/config/schema/kubetypes/common.go
@@ -18,7 +18,7 @@
package kubetypes
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/typemap"
diff --git a/pkg/config/schema/kubetypes/resources.gen.go b/pkg/config/schema/kubetypes/resources.go
similarity index 95%
rename from pkg/config/schema/kubetypes/resources.gen.go
rename to pkg/config/schema/kubetypes/resources.go
index 0eb976b..7090e19 100644
--- a/pkg/config/schema/kubetypes/resources.gen.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -18,7 +18,7 @@
package kubetypes
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
istioioapimeshv1alpha1 "istio.io/api/mesh/v1alpha1"
k8sioapicorev1 "k8s.io/api/core/v1"
diff --git a/pkg/config/visibility/visibility.go b/pkg/config/visibility/visibility.go
new file mode 100644
index 0000000..49fe4f6
--- /dev/null
+++ b/pkg/config/visibility/visibility.go
@@ -0,0 +1,28 @@
+package visibility
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+)
+
+type Instance string
+
+const (
+ Private Instance = "."
+ Public Instance = "*"
+ None Instance = "~"
+)
+
+func (v Instance) Validate() (errs error) {
+ switch v {
+ case Private, Public:
+ return nil
+ case None:
+ return fmt.Errorf("exportTo ~ (none) is not allowed for Istio configuration objects")
+ default:
+ if !labels.IsDNS1123Label(string(v)) {
+ return fmt.Errorf("only .,*, or a valid DNS 1123 label is allowed as exportTo entry")
+ }
+ }
+ return nil
+}
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index 8ee7595..2a340d7 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -28,8 +28,8 @@
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/kube/krt/collection.go b/pkg/kube/krt/collection.go
index de44a06..be30cce 100644
--- a/pkg/kube/krt/collection.go
+++ b/pkg/kube/krt/collection.go
@@ -23,8 +23,8 @@
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/queue"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"k8s.io/klog/v2"
"sync"
)
diff --git a/pkg/kube/krt/fetch.go b/pkg/kube/krt/fetch.go
index 933c991..b2eaf6b 100644
--- a/pkg/kube/krt/fetch.go
+++ b/pkg/kube/krt/fetch.go
@@ -18,7 +18,7 @@
package krt
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
func FetchOne[T any](ctx HandlerContext, c Collection[T], opts ...FetchOption) *T {
diff --git a/pkg/kube/krt/files/files.go b/pkg/kube/krt/files/files.go
index 9f4e6d3..743b8ce 100644
--- a/pkg/kube/krt/files/files.go
+++ b/pkg/kube/krt/files/files.go
@@ -21,8 +21,8 @@
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/fsnotify/fsnotify"
"go.uber.org/atomic"
"k8s.io/klog/v2"
diff --git a/pkg/kube/krt/index.go b/pkg/kube/krt/index.go
index e94b355..96788c0 100644
--- a/pkg/kube/krt/index.go
+++ b/pkg/kube/krt/index.go
@@ -21,8 +21,8 @@
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"k8s.io/client-go/tools/cache"
)
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 2dd0526..f7d2647 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -23,7 +23,7 @@
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
diff --git a/pkg/kube/krt/processor.go b/pkg/kube/krt/processor.go
index 636443f..e4d0fac 100644
--- a/pkg/kube/krt/processor.go
+++ b/pkg/kube/krt/processor.go
@@ -18,7 +18,7 @@
package krt
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"sync"
"sync/atomic"
diff --git a/pkg/kube/krt/singleton.go b/pkg/kube/krt/singleton.go
index 758ef3f..05186de 100644
--- a/pkg/kube/krt/singleton.go
+++ b/pkg/kube/krt/singleton.go
@@ -21,8 +21,7 @@
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
-
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"sync/atomic"
)
diff --git a/pkg/kube/krt/static.go b/pkg/kube/krt/static.go
index a0b386b..8698c85 100644
--- a/pkg/kube/krt/static.go
+++ b/pkg/kube/krt/static.go
@@ -5,8 +5,8 @@
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"sync"
)
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index e9baa49..7c7e623 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -19,6 +19,7 @@
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/klog/v2"
"sync"
@@ -33,7 +34,6 @@
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
meshapi "istio.io/api/mesh/v1alpha1"
)
diff --git a/pkg/network/id.go b/pkg/network/id.go
new file mode 100644
index 0000000..23291ab
--- /dev/null
+++ b/pkg/network/id.go
@@ -0,0 +1,14 @@
+package network
+
+import "github.com/apache/dubbo-kubernetes/pkg/util/identifier"
+
+// ID is the unique identifier for a network.
+type ID string
+
+func (id ID) Equals(other ID) bool {
+ return identifier.IsSameOrEmpty(string(id), string(other))
+}
+
+func (id ID) String() string {
+ return string(id)
+}
diff --git a/pkg/util/slices/slices.go b/pkg/slices/slices.go
similarity index 100%
rename from pkg/util/slices/slices.go
rename to pkg/slices/slices.go
diff --git a/pkg/util/sets/set.go b/pkg/util/sets/set.go
index ca63142..d9b0ddd 100644
--- a/pkg/util/sets/set.go
+++ b/pkg/util/sets/set.go
@@ -19,16 +19,13 @@
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
import (
"golang.org/x/exp/constraints"
)
-import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
-)
-
type Set[T comparable] map[T]struct{}
type String = Set[string]
diff --git a/pkg/util/smallset/smallset.go b/pkg/util/smallset/smallset.go
index 9e84cdc..753762e 100644
--- a/pkg/util/smallset/smallset.go
+++ b/pkg/util/smallset/smallset.go
@@ -20,7 +20,7 @@
import (
"cmp"
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
// Set is an immutable set optimized for *small number of items*. For general purposes, Sets is likely better
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 183f05a..814630d 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -360,3 +360,15 @@
func (conn *Connection) StreamDone() <-chan struct{} {
return conn.stream.Context().Done()
}
+
+func (conn *Connection) InitializedCh() chan struct{} {
+ return conn.initialized
+}
+
+func (conn *Connection) ErrorCh() chan error {
+ return conn.errorChan
+}
+
+func (conn *Connection) StopCh() chan struct{} {
+ return conn.stop
+}
diff --git a/sail/pkg/bootstrap/certcontroller.go b/sail/pkg/bootstrap/certcontroller.go
index b0d2869..0e61cf3 100644
--- a/sail/pkg/bootstrap/certcontroller.go
+++ b/sail/pkg/bootstrap/certcontroller.go
@@ -18,13 +18,26 @@
package bootstrap
import (
+ "bytes"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/sleep"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
+ "github.com/apache/dubbo-kubernetes/security/pkg/k8s/chiron"
+ "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
+ certutil "github.com/apache/dubbo-kubernetes/security/pkg/util"
"k8s.io/klog/v2"
+ "os"
+ "path"
+ "strings"
+ "time"
)
const (
- defaultCACertPath = "./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+ defaultCertGracePeriodRatio = 0.5
+ rootCertPollingInterval = 60 * time.Second
+ defaultCACertPath = "./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
func (s *Server) updateRootCertAndGenKeyCert() error {
@@ -51,3 +64,189 @@
s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
return nil
}
+
+func (s *Server) initFileCertificateWatches(tlsOptions TLSOptions) error {
+ if err := s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile, tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+ return fmt.Errorf("set keyCertBundle failed: %v", err)
+ }
+ // TODO: Setup watcher for root and restart server if it changes.
+ for _, file := range []string{tlsOptions.CertFile, tlsOptions.KeyFile} {
+ klog.Infof("adding watcher for certificate %s", file)
+ if err := s.fileWatcher.Add(file); err != nil {
+ return fmt.Errorf("could not watch %v: %v", file, err)
+ }
+ }
+ s.addStartFunc("certificate rotation", func(stop <-chan struct{}) error {
+ go func() {
+ var keyCertTimerC <-chan time.Time
+ for {
+ select {
+ case <-keyCertTimerC:
+ keyCertTimerC = nil
+ if err := s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile, tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+ klog.Errorf("Setting keyCertBundle failed: %v", err)
+ }
+ case <-s.fileWatcher.Events(tlsOptions.CertFile):
+ if keyCertTimerC == nil {
+ keyCertTimerC = time.After(watchDebounceDelay)
+ }
+ case <-s.fileWatcher.Events(tlsOptions.KeyFile):
+ if keyCertTimerC == nil {
+ keyCertTimerC = time.After(watchDebounceDelay)
+ }
+ case err := <-s.fileWatcher.Errors(tlsOptions.CertFile):
+ klog.Errorf("error watching %v: %v", tlsOptions.CertFile, err)
+ case err := <-s.fileWatcher.Errors(tlsOptions.KeyFile):
+ klog.Errorf("error watching %v: %v", tlsOptions.KeyFile, err)
+ case <-stop:
+ return
+ }
+ }
+ }()
+ return nil
+ })
+ return nil
+}
+
+func (s *Server) RotateDNSCertForK8sCA(stop <-chan struct{},
+ defaultCACertPath string,
+ signerName string,
+ approveCsr bool,
+ requestedLifetime time.Duration,
+) {
+ certUtil := certutil.NewCertUtil(int(defaultCertGracePeriodRatio * 100))
+ for {
+ waitTime, _ := certUtil.GetWaitTime(s.dubbodCertBundleWatcher.GetKeyCertBundle().CertPem, time.Now())
+ if !sleep.Until(stop, waitTime) {
+ return
+ }
+ certChain, keyPEM, _, err := chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), defaultCACertPath, signerName, approveCsr, requestedLifetime)
+ if err != nil {
+ klog.Errorf("failed regenerating key and cert for dubbod by kubernetes: %v", err)
+ continue
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, s.dubbodCertBundleWatcher.GetCABundle())
+ }
+}
+func (s *Server) initDNSCertsK8SRA() error {
+ var certChain, keyPEM, caBundle []byte
+ var err error
+ pilotCertProviderName := features.SailCertProvider
+
+ signerName := strings.TrimPrefix(pilotCertProviderName, constants.CertProviderKubernetesSignerPrefix)
+ klog.Infof("Generating K8S-signed cert for %v using signer %v", s.dnsNames, signerName)
+ certChain, keyPEM, _, err = chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), "", signerName, true, SelfSignedCACertTTL.Get())
+ if err != nil {
+ return fmt.Errorf("failed generating key and cert by kubernetes: %v", err)
+ }
+ caBundle, err = s.RA.GetRootCertFromMeshConfig(signerName)
+ if err != nil {
+ return err
+ }
+
+ // MeshConfig:Add callback for mesh config update
+ s.environment.AddMeshHandler(func() {
+ newCaBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
+ if newCaBundle != nil && !bytes.Equal(newCaBundle, s.dubbodCertBundleWatcher.GetKeyCertBundle().CABundle) {
+ newCertChain, newKeyPEM, _, err := chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), "", signerName, true, SelfSignedCACertTTL.Get())
+ if err != nil {
+ klog.Errorf("failed regenerating key and cert for istiod by kubernetes: %v", err)
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(newKeyPEM, newCertChain, newCaBundle)
+ }
+ })
+
+ s.addStartFunc("istiod server certificate rotation", func(stop <-chan struct{}) error {
+ go func() {
+ // Track TTL of DNS cert and renew cert in accordance to grace period.
+ s.RotateDNSCertForK8sCA(stop, "", signerName, true, SelfSignedCACertTTL.Get())
+ }()
+ return nil
+ })
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+ return nil
+}
+
+func (s *Server) initDNSCertsDubbod() error {
+ var certChain, keyPEM, caBundle []byte
+ var err error
+ // Generate certificates for Dubbod DNS names, signed by Dubbod CA
+ certChain, keyPEM, err = s.CA.GenKeyCert(s.dnsNames, SelfSignedCACertTTL.Get(), false)
+ if err != nil {
+ return fmt.Errorf("failed generating dubbod key cert %v", err)
+ }
+ klog.Infof("Generating dubbod-signed cert for %v:\n %s", s.dnsNames, certChain)
+
+ fileBundle, err := detectSigningCABundleAndCRL()
+ if err != nil {
+ return fmt.Errorf("unable to determine signing file format %v", err)
+ }
+
+ dubboGenerated, detectedSigningCABundle := false, false
+ if _, err := os.Stat(fileBundle.SigningKeyFile); err == nil {
+ detectedSigningCABundle = true
+ if _, err := os.Stat(path.Join(LocalCertDir.Get(), ca.DubboGenerated)); err == nil {
+ dubboGenerated = true
+ }
+ }
+
+ // check if signing key file exists the cert dir and if the dubbo-generated file
+ // exists (only if USE_CACERTS_FOR_SELF_SIGNED_CA is enabled)
+ if !detectedSigningCABundle {
+ klog.Infof("Use roots from dubbo-ca-secret")
+
+ caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ s.addStartFunc("dubbod server certificate rotation", func(stop <-chan struct{}) error {
+ go func() {
+ // regenerate dubbod key cert when root cert changes.
+ s.watchRootCertAndGenKeyCert(stop)
+ }()
+ return nil
+ })
+ } else if features.UseCacertsForSelfSignedCA && dubboGenerated {
+ klog.Infof("Use roots from %v and watch", fileBundle.RootCertFile)
+
+ caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ // Similar code to dubbo-ca-secret: refresh the root cert, but in casecrets
+ s.addStartFunc("dubbod server certificate rotation", func(stop <-chan struct{}) error {
+ go func() {
+ // regenerate dubbod key cert when root cert changes.
+ s.watchRootCertAndGenKeyCert(stop)
+ }()
+ return nil
+ })
+
+ } else {
+ klog.Infof("Use root cert from %v", fileBundle.RootCertFile)
+
+ caBundle, err = os.ReadFile(fileBundle.RootCertFile)
+ if err != nil {
+ return fmt.Errorf("failed reading %s: %v", fileBundle.RootCertFile, err)
+ }
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+ return nil
+}
+
+func (s *Server) watchRootCertAndGenKeyCert(stop <-chan struct{}) {
+ caBundle := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ for {
+ if !sleep.Until(stop, rootCertPollingInterval) {
+ return
+ }
+ newRootCert := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ if !bytes.Equal(caBundle, newRootCert) {
+ caBundle = newRootCert
+ certChain, keyPEM, err := s.CA.GenKeyCert(s.dnsNames, SelfSignedCACertTTL.Get(), false)
+ if err != nil {
+ klog.Errorf("failed generating dubbod key cert %v", err)
+ } else {
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+ klog.Infof("regenerated dubbod dns cert: %s", certChain)
+ }
+ }
+ }
+}
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index 83c9baa..a45e9b1 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -56,6 +56,16 @@
HTTPSAddr string
GRPCAddr string
SecureGRPCAddr string
+ TLSOptions TLSOptions
+}
+
+type TLSOptions struct {
+ // CaCertFile and related are set using CLI flags.
+ CaCertFile string
+ CertFile string
+ KeyFile string
+ TLSCipherSuites []string
+ CipherSuits []uint16 // This is the parsed cipher suites
}
func NewSailArgs(initFuncs ...func(*SailArgs)) *SailArgs {
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index f95cc43..89e4994 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -19,6 +19,8 @@
import (
"context"
+ "crypto/tls"
+ "crypto/x509"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
@@ -29,19 +31,27 @@
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
+ sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/network"
+ "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
"github.com/fsnotify/fsnotify"
+ grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/http2"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/reflection"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/klog"
@@ -49,9 +59,15 @@
"net/http"
"os"
"strings"
+ "sync"
"time"
)
+const (
+ // debounce file watcher events to minimize noise in logs
+ watchDebounceDelay = 100 * time.Millisecond
+)
+
type Server struct {
XDSServer *xds.DiscoveryServer
clusterID cluster.ID
@@ -84,18 +100,30 @@
CA *ca.DubboCA
dnsNames []string
+ certMu sync.RWMutex
+ dubbodCert *tls.Certificate
+
dubbodCertBundleWatcher *keycertbundle.Watcher
}
func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) {
e := model.NewEnvironment()
+ e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
+
+ ac := aggregate.NewController(aggregate.Options{
+ MeshHolder: e,
+ ConfigClusterID: getClusterID(args),
+ })
+ e.ServiceDiscovery = ac
+
s := &Server{
- environment: e,
- server: server.New(),
- clusterID: getClusterID(args),
- httpMux: http.NewServeMux(),
- fileWatcher: filewatcher.NewWatcher(),
- internalStop: make(chan struct{}),
+ environment: e,
+ server: server.New(),
+ clusterID: getClusterID(args),
+ httpMux: http.NewServeMux(),
+ dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
+ fileWatcher: filewatcher.NewWatcher(),
+ internalStop: make(chan struct{}),
}
for _, fn := range initFuncs {
fn(s)
@@ -157,6 +185,20 @@
return nil, err
}
+ dubbodHost, _, err := e.GetDiscoveryAddress()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := s.initDubbodCerts(args, string(dubbodHost)); err != nil {
+ return nil, err
+ }
+
+ // Secure gRPC Server must be initialized after CA is created as may use a Aegis generated cert.
+ if err := s.initSecureDiscoveryService(args, s.environment.Mesh().GetTrustDomain()); err != nil {
+ return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
+ }
+
return s, nil
}
@@ -298,14 +340,21 @@
}
func (s *Server) initGrpcServer(options *dubbokeepalive.Options) {
+ interceptors := []grpc.UnaryServerInterceptor{
+ // setup server prometheus monitoring (as final interceptor in chain)
+ grpcprom.UnaryServerInterceptor,
+ }
+ grpcOptions := dubbogrpc.ServerOptions(options, interceptors...)
+ s.grpcServer = grpc.NewServer(grpcOptions...)
+ s.XDSServer.Register(s.grpcServer)
+ reflection.Register(s.grpcServer)
}
-// initControllers initializes the controllers.
func (s *Server) initControllers(args *SailArgs) error {
klog.Info("initializing controllers")
// TODO initMulticluster
- // TODO initSDSServer
+ s.initSDSServer()
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
@@ -316,6 +365,114 @@
return nil
}
+func (s *Server) initSecureDiscoveryService(args *SailArgs, trustDomain string) error {
+ if args.ServerOptions.SecureGRPCAddr == "" {
+ klog.Info("The secure discovery port is disabled, multiplexing on httpAddr ")
+ return nil
+ }
+
+ peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions, trustDomain)
+ if err != nil {
+ return err
+ }
+ if peerCertVerifier == nil {
+ // Running locally without configured certs - no TLS mode
+ klog.Warningf("The secure discovery service is disabled")
+ return nil
+ }
+ klog.Info("initializing secure discovery service")
+
+ cfg := &tls.Config{
+ GetCertificate: s.getDubbodCertificate,
+ ClientAuth: tls.VerifyClientCertIfGiven,
+ ClientCAs: peerCertVerifier.GetGeneralCertPool(),
+ VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
+ err := peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
+ if err != nil {
+ klog.Infof("Could not verify certificate: %v", err)
+ }
+ return err
+ },
+ MinVersion: tls.VersionTLS12,
+ CipherSuites: args.ServerOptions.TLSOptions.CipherSuits,
+ }
+ // Compliance for xDS server TLS.
+ sec_model.EnforceGoCompliance(cfg)
+
+ tlsCreds := credentials.NewTLS(cfg)
+
+ s.secureGrpcAddress = args.ServerOptions.SecureGRPCAddr
+
+ interceptors := []grpc.UnaryServerInterceptor{
+ // setup server prometheus monitoring (as final interceptor in chain)
+ grpcprom.UnaryServerInterceptor,
+ }
+ opts := dubbogrpc.ServerOptions(args.KeepaliveOptions, interceptors...)
+ opts = append(opts, grpc.Creds(tlsCreds))
+
+ s.secureGrpcServer = grpc.NewServer(opts...)
+ s.XDSServer.Register(s.secureGrpcServer)
+ reflection.Register(s.secureGrpcServer)
+
+ s.addStartFunc("secure gRPC", func(stop <-chan struct{}) error {
+ go func() {
+ <-stop
+ s.secureGrpcServer.Stop()
+ }()
+ return nil
+ })
+
+ return nil
+}
+
+func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions, trustDomain string) (*spiffe.PeerCertVerifier, error) {
+ customTLSCertsExists, _, _, caCertPath := hasCustomTLSCerts(tlsOptions)
+ if !customTLSCertsExists && s.CA == nil && !s.isK8SSigning() {
+ // Running locally without configured certs - no TLS mode
+ return nil, nil
+ }
+ peerCertVerifier := spiffe.NewPeerCertVerifier()
+ var rootCertBytes []byte
+ var err error
+ if caCertPath != "" {
+ if rootCertBytes, err = os.ReadFile(caCertPath); err != nil {
+ return nil, err
+ }
+ } else {
+ if s.RA != nil {
+ if strings.HasPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix) {
+ signerName := strings.TrimPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix)
+ caBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
+ rootCertBytes = append(rootCertBytes, caBundle...)
+ } else {
+ rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
+ }
+ }
+ if s.CA != nil {
+ rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
+ }
+ }
+
+ if len(rootCertBytes) != 0 {
+ // TODO: trustDomain here is static and will not update if it dynamically changes in mesh config
+ err := peerCertVerifier.AddMappingFromPEM(trustDomain, rootCertBytes)
+ if err != nil {
+ return nil, fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
+ }
+ }
+
+ return peerCertVerifier, nil
+}
+
+func (s *Server) getDubbodCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
+ s.certMu.RLock()
+ defer s.certMu.RUnlock()
+ if s.dubbodCert != nil {
+ return s.dubbodCert, nil
+ }
+ return nil, fmt.Errorf("cert not initialized")
+}
+
func (s *Server) serveHTTP() error {
// At this point we are ready - start Http Listener so that it can respond to readiness events.
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
@@ -376,7 +533,17 @@
return clusterID
}
-// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of private keys known by Istiod
+func (s *Server) initSDSServer() {
+ if s.kubeClient == nil {
+ return
+ }
+ if !features.EnableXDSIdentityCheck {
+ // Make sure we have security
+ klog.Warningf("skipping Kubernetes credential reader; SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
+ }
+}
+
+// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of private keys known by Dubbod
func (s *Server) isK8SSigning() bool {
return s.RA != nil && strings.HasPrefix(features.NaviCertProvider, constants.CertProviderKubernetesSignerPrefix)
}
@@ -443,8 +610,12 @@
func (s *Server) cachesSynced() bool {
// TODO multiclusterController HasSynced
- // TODO ServiceController().HasSynced
- // TODO configController.HasSynced
+ if !s.ServiceController().HasSynced() {
+ return false
+ }
+ if !s.configController.HasSynced() {
+ return false
+ }
return true
}
@@ -468,3 +639,144 @@
expected := s.XDSServer.InboundUpdates.Load()
return kubelib.WaitForCacheSync("push context", stop, func() bool { return s.pushContextReady(expected) })
}
+
+func (s *Server) initDubbodCerts(args *SailArgs, host string) error {
+ // Skip all certificates
+ var err error
+
+ s.dnsNames = getDNSNames(args, host)
+ if hasCustomCertArgsOrWellKnown, tlsCertPath, tlsKeyPath, caCertPath := hasCustomTLSCerts(args.ServerOptions.TLSOptions); hasCustomCertArgsOrWellKnown {
+ // Use the DNS certificate provided via args or in well known location.
+ err = s.initFileCertificateWatches(TLSOptions{
+ CaCertFile: caCertPath,
+ KeyFile: tlsKeyPath,
+ CertFile: tlsCertPath,
+ })
+ if err != nil {
+ // Not crashing dubbod - This typically happens if certs are missing and in tests.
+ klog.Errorf("error initializing certificate watches: %v", err)
+ return nil
+ }
+ } else if features.EnableCAServer && features.SailCertProvider == constants.CertProviderDubbod {
+ klog.Infof("initializing Dubbod DNS certificates host: %s, custom host: %s", host, features.DubbodServiceCustomHost)
+ err = s.initDNSCertsDubbod()
+ } else if features.SailCertProvider == constants.CertProviderKubernetes {
+ klog.Warningf("SAIL_CERT_PROVIDER=kubernetes is no longer supported by upstream K8S")
+ } else if strings.HasPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix) {
+ klog.Infof("initializing Dubbod DNS certificates using K8S RA:%s host: %s, custom host: %s", features.SailCertProvider,
+ host, features.DubbodServiceCustomHost)
+ err = s.initDNSCertsK8SRA()
+ } else {
+ klog.Warningf("SAIL_CERT_PROVIDER=%s is not implemented", features.SailCertProvider)
+ }
+
+ if err == nil {
+ err = s.initDubbodCertLoader()
+ }
+
+ return err
+}
+
+func getDNSNames(args *SailArgs, host string) []string {
+ // Append custom hostname if there is any
+ customHost := features.DubbodServiceCustomHost
+ var cHosts []string
+
+ if customHost != "" {
+ cHosts = strings.Split(customHost, ",")
+ }
+ sans := sets.New(cHosts...)
+ sans.Insert(host)
+ dnsNames := sets.SortedList(sans)
+ klog.Infof("Discover server subject alt names: %v", dnsNames)
+ return dnsNames
+}
+
+func hasCustomTLSCerts(tlsOptions TLSOptions) (ok bool, tlsCertPath, tlsKeyPath, caCertPath string) {
+ // load from tls args as priority
+ if hasCustomTLSCertArgs(tlsOptions) {
+ return true, tlsOptions.CertFile, tlsOptions.KeyFile, tlsOptions.CaCertFile
+ }
+
+ if ok = checkPathsExist(constants.DefaultSailTLSCert, constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCert); ok {
+ tlsCertPath = constants.DefaultSailTLSCert
+ tlsKeyPath = constants.DefaultSailTLSKey
+ caCertPath = constants.DefaultSailTLSCaCert
+ return
+ }
+
+ if ok = checkPathsExist(constants.DefaultSailTLSCert, constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCertAlternatePath); ok {
+ tlsCertPath = constants.DefaultSailTLSCert
+ tlsKeyPath = constants.DefaultSailTLSKey
+ caCertPath = constants.DefaultSailTLSCaCertAlternatePath
+ return
+ }
+
+ return
+}
+
+func checkPathsExist(paths ...string) bool {
+ for _, path := range paths {
+ fInfo, err := os.Stat(path)
+
+ if err != nil || fInfo.IsDir() {
+ return false
+ }
+ }
+ return true
+}
+
+func hasCustomTLSCertArgs(tlsOptions TLSOptions) bool {
+ return tlsOptions.CaCertFile != "" && tlsOptions.CertFile != "" && tlsOptions.KeyFile != ""
+}
+
+func (s *Server) initDubbodCertLoader() error {
+ if err := s.loadDubbodCert(); err != nil {
+ return fmt.Errorf("first time load DubbodCert failed: %v", err)
+ }
+ _, watchCh := s.dubbodCertBundleWatcher.AddWatcher()
+ s.addStartFunc("reload certs", func(stop <-chan struct{}) error {
+ go s.reloadDubbodCert(watchCh, stop)
+ return nil
+ })
+ return nil
+}
+
+func (s *Server) loadDubbodCert() error {
+ keyCertBundle := s.dubbodCertBundleWatcher.GetKeyCertBundle()
+ keyPair, err := tls.X509KeyPair(keyCertBundle.CertPem, keyCertBundle.KeyPem)
+ if err != nil {
+ return fmt.Errorf("dubbod loading x509 key pairs failed: %v", err)
+ }
+ for _, c := range keyPair.Certificate {
+ x509Cert, err := x509.ParseCertificates(c)
+ if err != nil {
+ // This can rarely happen, just in case.
+ return fmt.Errorf("x509 cert - ParseCertificates() error: %v", err)
+ }
+ for _, c := range x509Cert {
+ klog.Infof("x509 cert - Issuer: %q, Subject: %q, SN: %x, NotBefore: %q, NotAfter: %q",
+ c.Issuer, c.Subject, c.SerialNumber,
+ c.NotBefore.Format(time.RFC3339), c.NotAfter.Format(time.RFC3339))
+ }
+ }
+
+ klog.Info("Dubbod certificates are reloaded")
+ s.certMu.Lock()
+ s.dubbodCert = &keyPair
+ s.certMu.Unlock()
+ return nil
+}
+
+func (s *Server) reloadDubbodCert(watchCh <-chan struct{}, stopCh <-chan struct{}) {
+ for {
+ select {
+ case <-stopCh:
+ return
+ case <-watchCh:
+ if err := s.loadDubbodCert(); err != nil {
+ klog.Errorf("reload dubbod cert failed: %v", err)
+ }
+ }
+ }
+}
diff --git a/sail/pkg/bootstrap/servicecontroller.go b/sail/pkg/bootstrap/servicecontroller.go
index 44e493a..f324fc9 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -16,8 +16,6 @@
func (s *Server) initServiceControllers(args *SailArgs) error {
serviceControllers := s.ServiceController()
- // TODO service entry controller
-
registered := sets.New[provider.ID]()
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
diff --git a/sail/pkg/config/aggregate/config.go b/sail/pkg/config/aggregate/config.go
index 16dfd44..8a8478a 100644
--- a/sail/pkg/config/aggregate/config.go
+++ b/sail/pkg/config/aggregate/config.go
@@ -3,13 +3,13 @@
import (
"errors"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/apimachinery/pkg/types"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
)
diff --git a/sail/pkg/config/memory/controller.go b/sail/pkg/config/memory/controller.go
index 06c121d..5119bb6 100644
--- a/sail/pkg/config/memory/controller.go
+++ b/sail/pkg/config/memory/controller.go
@@ -4,7 +4,7 @@
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"k8s.io/apimachinery/pkg/types"
)
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index a3fe8e7..08213a1 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -55,4 +55,11 @@
"If set to false, Dubbo will not watch for the ca-crl.pem file in the /etc/cacerts directory "+
"and will not distribute CRL data to namespaces for proxies to consume.",
).Get()
+ SailCertProvider = env.Register("SAIL_CERT_PROVIDER", constants.CertProviderDubbod,
+ "The provider of Pilot DNS certificate. K8S RA will be used for k8s.io/NAME. 'dubbod' value will sign"+
+ " using Dubbo build in CA. Other values will not not generate TLS certs, but still "+
+ " distribute ./etc/certs/root-cert.pem. Only used if custom certificates are not mounted.").Get()
+ DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
+ "Custom host name of dubbod that dubbod signs the server cert. "+
+ "Multiple custom host names are supported, and multiple values are separated by commas.").Get()
)
diff --git a/sail/pkg/features/security.go b/sail/pkg/features/security.go
index 859f0ff..fb2284e 100644
--- a/sail/pkg/features/security.go
+++ b/sail/pkg/features/security.go
@@ -24,4 +24,9 @@
UseCacertsForSelfSignedCA = env.Register("USE_CACERTS_FOR_SELF_SIGNED_CA", false,
"If enabled, dubbod will use a secret named cacerts to store its self-signed dubbo-"+
"generated root certificate.").Get()
+ EnableXDSIdentityCheck = env.Register(
+ "SAIL_ENABLE_XDS_IDENTITY_CHECK",
+ true,
+ "If enabled, sail will authorize XDS clients, to ensure they are acting only as namespaces they have permissions for.",
+ ).Get()
)
diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go
index ca91a46..a99ba30 100644
--- a/sail/pkg/features/tuning.go
+++ b/sail/pkg/features/tuning.go
@@ -25,4 +25,11 @@
100000,
"Sets the maximum number of concurrent grpc streams.",
).Get()
+
+ // MaxRecvMsgSize The max receive buffer size of gRPC received channel of Pilot in bytes.
+ MaxRecvMsgSize = env.Register(
+ "DUBBO_GPRC_MAXRECVMSGSIZE",
+ 4*1024*1024,
+ "Sets the max receive buffer size of gRPC stream in bytes.",
+ ).Get()
)
diff --git a/sail/pkg/grpc/grpc.go b/sail/pkg/grpc/grpc.go
index 0f380b7..3bf677e 100644
--- a/sail/pkg/grpc/grpc.go
+++ b/sail/pkg/grpc/grpc.go
@@ -3,6 +3,8 @@
import (
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
@@ -58,6 +60,30 @@
return []grpc.DialOption{keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption, msgSizeOption, tlsDialOpts}, nil
}
+func ServerOptions(options *dubbokeepalive.Options, interceptors ...grpc.UnaryServerInterceptor) []grpc.ServerOption {
+ maxStreams := features.MaxConcurrentStreams
+ maxRecvMsgSize := features.MaxRecvMsgSize
+
+ grpcOptions := []grpc.ServerOption{
+ grpc.UnaryInterceptor(middleware.ChainUnaryServer(interceptors...)),
+ grpc.MaxConcurrentStreams(uint32(maxStreams)),
+ grpc.MaxRecvMsgSize(maxRecvMsgSize),
+ // Ensure we allow clients sufficient ability to send keep alives. If this is higher than client
+ // keep alive setting, it will prematurely get a GOAWAY sent.
+ grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+ MinTime: options.Time / 2,
+ }),
+ grpc.KeepaliveParams(keepalive.ServerParameters{
+ Time: options.Time,
+ Timeout: options.Timeout,
+ MaxConnectionAge: options.MaxServerConnectionAge,
+ MaxConnectionAgeGrace: options.MaxServerConnectionAgeGrace,
+ }),
+ }
+
+ return grpcOptions
+}
+
func GRPCErrorType(err error) ErrorType {
if err == io.EOF {
return GracefulTermination
diff --git a/sail/pkg/model/addressmap.go b/sail/pkg/model/addressmap.go
new file mode 100644
index 0000000..c8b166f
--- /dev/null
+++ b/sail/pkg/model/addressmap.go
@@ -0,0 +1,75 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+)
+
+func (m *AddressMap) GetAddresses() map[cluster.ID][]string {
+ if m == nil {
+ return nil
+ }
+
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.Addresses == nil {
+ return nil
+ }
+
+ out := make(map[cluster.ID][]string)
+ for k, v := range m.Addresses {
+ out[k] = slices.Clone(v)
+ }
+ return out
+}
+
+func (m *AddressMap) GetAddressesFor(c cluster.ID) []string {
+ if m == nil {
+ return nil
+ }
+
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.Addresses == nil {
+ return nil
+ }
+
+ // Copy the Addresses array.
+ return append([]string{}, m.Addresses[c]...)
+}
+
+func (m *AddressMap) SetAddressesFor(c cluster.ID, addresses []string) *AddressMap {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if len(addresses) == 0 {
+ // Setting an empty array for the cluster. Remove the entry for the cluster if it exists.
+ if m.Addresses != nil {
+ delete(m.Addresses, c)
+
+ // Delete the map if there's nothing left.
+ if len(m.Addresses) == 0 {
+ m.Addresses = nil
+ }
+ }
+ } else {
+ // Create the map if it doesn't already exist.
+ if m.Addresses == nil {
+ m.Addresses = make(map[cluster.ID][]string)
+ }
+ m.Addresses[c] = addresses
+ }
+ return m
+}
+
+func (m *AddressMap) Len() int {
+ if m == nil {
+ return 0
+ }
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ return len(m.Addresses)
+}
diff --git a/sail/pkg/model/authentication.go b/sail/pkg/model/authentication.go
new file mode 100644
index 0000000..f1c421a
--- /dev/null
+++ b/sail/pkg/model/authentication.go
@@ -0,0 +1,119 @@
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ "istio.io/api/security/v1beta1"
+ "k8s.io/klog/v2"
+ "strings"
+ "time"
+)
+
+type MutualTLSMode int
+
+const (
+ MTLSUnknown MutualTLSMode = iota
+ MTLSDisable
+ MTLSPermissive
+ MTLSStrict
+)
+
+type AuthenticationPolicies struct {
+ requestAuthentications map[string][]config.Config
+ peerAuthentications map[string][]config.Config
+ globalMutualTLSMode MutualTLSMode
+ rootNamespace string
+ namespaceMutualTLSMode map[string]MutualTLSMode
+ aggregateVersion string
+}
+
+func initAuthenticationPolicies(env *Environment) *AuthenticationPolicies {
+ policy := &AuthenticationPolicies{
+ requestAuthentications: map[string][]config.Config{},
+ peerAuthentications: map[string][]config.Config{},
+ globalMutualTLSMode: MTLSUnknown,
+ rootNamespace: env.Mesh().GetRootNamespace(),
+ }
+
+ policy.addRequestAuthentication(sortConfigByCreationTime(env.List(gvk.RequestAuthentication, NamespaceAll)))
+ policy.addPeerAuthentication(sortConfigByCreationTime(env.List(gvk.PeerAuthentication, NamespaceAll)))
+
+ return policy
+}
+
+func (policy *AuthenticationPolicies) addRequestAuthentication(configs []config.Config) {
+ for _, config := range configs {
+ policy.requestAuthentications[config.Namespace] = append(policy.requestAuthentications[config.Namespace], config)
+ }
+}
+
+func (policy *AuthenticationPolicies) addPeerAuthentication(configs []config.Config) {
+ sortConfigByCreationTime(configs)
+
+ foundNamespaceMTLS := make(map[string]v1beta1.PeerAuthentication_MutualTLS_Mode)
+ seenNamespaceOrMeshConfig := make(map[string]time.Time)
+ versions := []string{}
+
+ for _, config := range configs {
+ versions = append(versions, config.UID+"."+config.ResourceVersion)
+ spec := config.Spec.(*v1beta1.PeerAuthentication)
+ if spec.Selector == nil || len(spec.Selector.MatchLabels) == 0 {
+ if t, ok := seenNamespaceOrMeshConfig[config.Namespace]; ok {
+ klog.Warningf(
+ "Namespace/mesh-level PeerAuthentication is already defined for %q at time %v. Ignore %q which was created at time %v",
+ config.Namespace, t, config.Name, config.CreationTimestamp)
+ continue
+ }
+ seenNamespaceOrMeshConfig[config.Namespace] = config.CreationTimestamp
+
+ mode := v1beta1.PeerAuthentication_MutualTLS_UNSET
+ if spec.Mtls != nil {
+ mode = spec.Mtls.Mode
+ }
+ if config.Namespace == policy.rootNamespace {
+ if mode == v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ policy.globalMutualTLSMode = MTLSPermissive
+ } else {
+ policy.globalMutualTLSMode = ConvertToMutualTLSMode(mode)
+ }
+ } else {
+ foundNamespaceMTLS[config.Namespace] = mode
+ }
+ }
+
+ policy.peerAuthentications[config.Namespace] = append(policy.peerAuthentications[config.Namespace], config)
+ }
+
+ // nolint: gosec
+ // Not security sensitive code
+ policy.aggregateVersion = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(versions, ";"))))
+
+ policy.namespaceMutualTLSMode = make(map[string]MutualTLSMode, len(foundNamespaceMTLS))
+
+ inheritedMTLSMode := policy.globalMutualTLSMode
+ if inheritedMTLSMode == MTLSUnknown {
+ inheritedMTLSMode = MTLSPermissive
+ }
+ for ns, mtlsMode := range foundNamespaceMTLS {
+ if mtlsMode == v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ policy.namespaceMutualTLSMode[ns] = inheritedMTLSMode
+ } else {
+ policy.namespaceMutualTLSMode[ns] = ConvertToMutualTLSMode(mtlsMode)
+ }
+ }
+}
+
+func ConvertToMutualTLSMode(mode v1beta1.PeerAuthentication_MutualTLS_Mode) MutualTLSMode {
+ switch mode {
+ case v1beta1.PeerAuthentication_MutualTLS_DISABLE:
+ return MTLSDisable
+ case v1beta1.PeerAuthentication_MutualTLS_PERMISSIVE:
+ return MTLSPermissive
+ case v1beta1.PeerAuthentication_MutualTLS_STRICT:
+ return MTLSStrict
+ default:
+ return MTLSUnknown
+ }
+}
diff --git a/sail/pkg/model/authorization.go b/sail/pkg/model/authorization.go
new file mode 100644
index 0000000..9789ac3
--- /dev/null
+++ b/sail/pkg/model/authorization.go
@@ -0,0 +1,48 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ authpb "istio.io/api/security/v1beta1"
+)
+
+type AuthorizationPolicy struct {
+ Name string `json:"name"`
+ Namespace string `json:"namespace"`
+ Annotations map[string]string `json:"annotations"`
+ Spec *authpb.AuthorizationPolicy `json:"spec"`
+}
+
+type AuthorizationPolicies struct {
+ NamespaceToPolicies map[string][]AuthorizationPolicy `json:"namespace_to_policies"`
+ RootNamespace string `json:"root_namespace"`
+}
+
+func GetAuthorizationPolicies(env *Environment) *AuthorizationPolicies {
+ policy := &AuthorizationPolicies{
+ NamespaceToPolicies: map[string][]AuthorizationPolicy{},
+ RootNamespace: env.Mesh().GetRootNamespace(),
+ }
+
+ policies := env.List(gvk.AuthorizationPolicy, NamespaceAll)
+ sortConfigByCreationTime(policies)
+
+ policyCount := make(map[string]int)
+ for _, config := range policies {
+ policyCount[config.Namespace]++
+ }
+
+ for _, config := range policies {
+ authzConfig := AuthorizationPolicy{
+ Name: config.Name,
+ Namespace: config.Namespace,
+ Annotations: config.Annotations,
+ Spec: config.Spec.(*authpb.AuthorizationPolicy),
+ }
+ if _, ok := policy.NamespaceToPolicies[config.Namespace]; !ok {
+ policy.NamespaceToPolicies[config.Namespace] = make([]AuthorizationPolicy, 0, policyCount[config.Namespace])
+ }
+ policy.NamespaceToPolicies[config.Namespace] = append(policy.NamespaceToPolicies[config.Namespace], authzConfig)
+ }
+
+ return policy
+}
diff --git a/sail/pkg/model/cluster_local.go b/sail/pkg/model/cluster_local.go
new file mode 100644
index 0000000..9bb6b42
--- /dev/null
+++ b/sail/pkg/model/cluster_local.go
@@ -0,0 +1,14 @@
+package model
+
+import "github.com/apache/dubbo-kubernetes/pkg/config/host"
+
+type ClusterLocalHosts struct {
+ specific map[host.Name]bool
+ wildcard map[host.Name]bool
+}
+
+type ClusterLocalProvider interface {
+ // GetClusterLocalHosts returns the list of cluster-local hosts, sorted in
+ // ascending order. The caller must not modify the returned list.
+ GetClusterLocalHosts() ClusterLocalHosts
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 78e5b91..6619a5d 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -1,8 +1,14 @@
package model
import (
+ "cmp"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "sort"
+)
+
+const (
+ NamespaceAll = ""
)
type ConfigStore interface {
@@ -24,3 +30,16 @@
Run(stop <-chan struct{})
HasSynced() bool
}
+
+func sortConfigByCreationTime(configs []config.Config) []config.Config {
+ sort.Slice(configs, func(i, j int) bool {
+ if r := configs[i].CreationTimestamp.Compare(configs[j].CreationTimestamp); r != 0 {
+ return r == -1 // -1 means i is less than j, so return true
+ }
+ if r := cmp.Compare(configs[i].Name, configs[j].Name); r != 0 {
+ return r == -1
+ }
+ return cmp.Compare(configs[i].Namespace, configs[j].Namespace) == -1
+ })
+ return configs
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 49705d6..e6a2f4f 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -22,6 +22,7 @@
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
meshconfig "istio.io/api/mesh/v1alpha1"
"net"
@@ -31,14 +32,19 @@
type Watcher = meshwatcher.WatcherCollection
+type WatchedResource = xds.WatchedResource
+
type Environment struct {
ServiceDiscovery
Watcher
ConfigStore
- mutex sync.RWMutex
- pushContext *PushContext
- Cache XdsCache
- NetworksWatcher mesh.NetworksWatcher
+ mutex sync.RWMutex
+ pushContext *PushContext
+ Cache XdsCache
+ NetworksWatcher mesh.NetworksWatcher
+ NetworkManager *NetworkManager
+ clusterLocalServices ClusterLocalProvider
+ DomainSuffix string
}
type XdsCacheImpl struct {
@@ -56,24 +62,25 @@
cache = DisabledCache{}
}
return &Environment{
- Cache: cache,
+ pushContext: NewPushContext(),
+ Cache: cache,
}
}
var _ mesh.Holder = &Environment{}
-func (e *Environment) SetPushContext(pc *PushContext) {
- e.mutex.Lock()
- defer e.mutex.Unlock()
- e.pushContext = pc
-}
-
func (e *Environment) PushContext() *PushContext {
e.mutex.RLock()
defer e.mutex.RUnlock()
return e.pushContext
}
+func (e *Environment) SetPushContext(pc *PushContext) {
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
+ e.pushContext = pc
+}
+
func (e *Environment) Mesh() *meshconfig.MeshConfig {
if e != nil && e.Watcher != nil {
return e.Watcher.Mesh()
@@ -109,4 +116,8 @@
return host.Name(hostname), port, nil
}
+func (e *Environment) ClusterLocal() ClusterLocalProvider {
+ return e.clusterLocalServices
+}
+
type Proxy struct{}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
index a79c32f..72e2dd1 100644
--- a/sail/pkg/model/controller.go
+++ b/sail/pkg/model/controller.go
@@ -1,8 +1,12 @@
package model
type Controller interface {
- // Run until a signal is received
Run(stop <-chan struct{})
+ HasSynced() bool
+}
+
+type AggregateController interface {
+ Controller
}
type Event int
diff --git a/sail/pkg/model/network.go b/sail/pkg/model/network.go
new file mode 100644
index 0000000..66f7ce2
--- /dev/null
+++ b/sail/pkg/model/network.go
@@ -0,0 +1,4 @@
+package model
+
+type NetworkManager struct {
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 76870e3..4e94184 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -17,10 +17,78 @@
package model
-import meshconfig "istio.io/api/mesh/v1alpha1"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "go.uber.org/atomic"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "k8s.io/apimachinery/pkg/types"
+ "sync"
+)
type PushContext struct {
- Mesh *meshconfig.MeshConfig `json:"-"`
+ Mesh *meshconfig.MeshConfig `json:"-"`
+ initializeMutex sync.Mutex
+ InitDone atomic.Bool
+ Networks *meshconfig.MeshNetworks
+ networkMgr *NetworkManager
+ clusterLocalHosts ClusterLocalHosts
+ exportToDefaults exportToDefaults
+ AuthnPolicies *AuthenticationPolicies `json:"-"`
+ AuthzPolicies *AuthorizationPolicies `json:"-"`
+ virtualServiceIndex virtualServiceIndex
+ destinationRuleIndex destinationRuleIndex
+ ServiceIndex serviceIndex
+ serviceAccounts map[serviceAccountKey][]string
+}
+
+type serviceAccountKey struct {
+ hostname host.Name
+ namespace string
+}
+
+type virtualServiceIndex struct {
+ exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
+ // this contains all the virtual services with exportTo "." and current namespace. The keys are namespace,gateway.
+ privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
+ // This contains all virtual services whose exportTo is "*", keyed by gateway
+ publicByGateway map[string][]config.Config
+ // root vs namespace/name ->delegate vs virtualservice gvk/namespace/name
+ delegates map[ConfigKey][]ConfigKey
+
+ // This contains destination hosts of virtual services, keyed by gateway's namespace/name,
+ // only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
+ destinationsByGateway map[string]sets.String
+
+ // Map of VS hostname -> referenced hostnames
+ referencedDestinations map[string]sets.String
+}
+
+type destinationRuleIndex struct {
+ namespaceLocal map[string]*consolidatedDestRules
+ exportedByNamespace map[string]*consolidatedDestRules
+ rootNamespaceLocal *consolidatedDestRules
+}
+
+type consolidatedDestRules struct {
+ specificDestRules map[host.Name][]*ConsolidatedDestRule
+ wildcardDestRules map[host.Name][]*ConsolidatedDestRule
+}
+
+type serviceIndex struct {
+ privateByNamespace map[string][]*Service
+ public []*Service
+ exportedToNamespace map[string][]*Service
+ HostnameAndNamespace map[host.Name]map[string]*Service `json:"-"`
+}
+
+type ConsolidatedDestRule struct {
+ exportTo sets.Set[visibility.Instance]
+ rule *config.Config
+ from []types.NamespacedName
}
type TriggerReason string
@@ -28,13 +96,30 @@
type ReasonStats map[TriggerReason]int
type PushRequest struct {
- Reason ReasonStats
+ Reason ReasonStats
+ ConfigsUpdated sets.Set[ConfigKey]
+ Forced bool
+ Full bool
+ Push *PushContext
+ LastPushContext *PushContext
}
func NewPushContext() *PushContext {
return &PushContext{}
}
+type ConfigKey struct {
+ Kind kind.Kind
+ Name string
+ Namespace string
+}
+
+type exportToDefaults struct {
+ service sets.Set[visibility.Instance]
+ virtualService sets.Set[visibility.Instance]
+ destinationRule sets.Set[visibility.Instance]
+}
+
func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest {
if pr == nil {
return other
@@ -48,4 +133,141 @@
}
type XDSUpdater interface {
-}
\ No newline at end of file
+}
+
+func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) {
+ ps.initializeMutex.Lock()
+ defer ps.initializeMutex.Unlock()
+ if ps.InitDone.Load() {
+ return
+ }
+
+ ps.Mesh = env.Mesh()
+ ps.Networks = env.MeshNetworks()
+
+ ps.initDefaultExportMaps()
+
+ if pushReq == nil || oldPushContext == nil || !oldPushContext.InitDone.Load() || pushReq.Forced {
+ ps.createNewContext(env)
+ } else {
+ ps.updateContext(env, oldPushContext, pushReq)
+ }
+
+ ps.networkMgr = env.NetworkManager
+
+ ps.clusterLocalHosts = env.ClusterLocal().GetClusterLocalHosts()
+
+ ps.InitDone.Store(true)
+}
+
+func (ps *PushContext) initDefaultExportMaps() {
+ ps.exportToDefaults.destinationRule = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultDestinationRuleExportTo != nil {
+ for _, e := range ps.Mesh.DefaultDestinationRuleExportTo {
+ ps.exportToDefaults.destinationRule.Insert(visibility.Instance(e))
+ }
+ } else {
+ // default to *
+ ps.exportToDefaults.destinationRule.Insert(visibility.Public)
+ }
+
+ ps.exportToDefaults.service = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultServiceExportTo != nil {
+ for _, e := range ps.Mesh.DefaultServiceExportTo {
+ ps.exportToDefaults.service.Insert(visibility.Instance(e))
+ }
+ } else {
+ ps.exportToDefaults.service.Insert(visibility.Public)
+ }
+
+ ps.exportToDefaults.virtualService = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultVirtualServiceExportTo != nil {
+ for _, e := range ps.Mesh.DefaultVirtualServiceExportTo {
+ ps.exportToDefaults.virtualService.Insert(visibility.Instance(e))
+ }
+ } else {
+ ps.exportToDefaults.virtualService.Insert(visibility.Public)
+ }
+}
+
+func (ps *PushContext) initServiceRegistry(env *Environment, configsUpdate sets.Set[ConfigKey]) {
+
+}
+
+func (ps *PushContext) initServiceAccounts(env *Environment, services []*Service) {
+}
+
+func (ps *PushContext) initVirtualServices(env *Environment) {
+}
+
+func (ps *PushContext) initDestinationRules(env *Environment) {
+}
+
+func (ps *PushContext) initAuthnPolicies(env *Environment) {
+ ps.AuthnPolicies = initAuthenticationPolicies(env)
+}
+
+func (ps *PushContext) initAuthorizationPolicies(env *Environment) {
+ ps.AuthzPolicies = GetAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) createNewContext(env *Environment) {
+ ps.initServiceRegistry(env, nil)
+ ps.initVirtualServices(env)
+ ps.initDestinationRules(env)
+ ps.initAuthnPolicies(env)
+ ps.initAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) updateContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) {
+ var servicesChanged, virtualServicesChanged, destinationRulesChanged, authnChanged, authzChanged bool
+
+ // We do not need to watch Ingress or Gateway API changes. Both of these have their own controllers which will send
+ // events for Istio types (Gateway and VirtualService).
+ for conf := range pushReq.ConfigsUpdated {
+ switch conf.Kind {
+ case kind.DestinationRule:
+ destinationRulesChanged = true
+ case kind.VirtualService:
+ virtualServicesChanged = true
+ case kind.AuthorizationPolicy:
+ authzChanged = true
+ case kind.RequestAuthentication,
+ kind.PeerAuthentication:
+ authnChanged = true
+ }
+ }
+
+ if servicesChanged {
+ // Services have changed. initialize service registry
+ ps.initServiceRegistry(env, pushReq.ConfigsUpdated)
+ } else {
+ // make sure we copy over things that would be generated in initServiceRegistry
+ ps.ServiceIndex = oldPushContext.ServiceIndex
+ ps.serviceAccounts = oldPushContext.serviceAccounts
+ }
+
+ if virtualServicesChanged {
+ ps.initVirtualServices(env)
+ } else {
+ ps.virtualServiceIndex = oldPushContext.virtualServiceIndex
+ }
+
+ if destinationRulesChanged {
+ ps.initDestinationRules(env)
+ } else {
+ ps.destinationRuleIndex = oldPushContext.destinationRuleIndex
+ }
+
+ if authnChanged {
+ ps.initAuthnPolicies(env)
+ } else {
+ ps.AuthnPolicies = oldPushContext.AuthnPolicies
+ }
+
+ if authzChanged {
+ ps.initAuthorizationPolicies(env)
+ } else {
+ ps.AuthzPolicies = oldPushContext.AuthzPolicies
+ }
+}
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 6573e2f..7b6c26d 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -1,4 +1,148 @@
package model
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/protocol"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "sync"
+ "time"
+)
+
+type NamespacedHostname struct {
+ Hostname host.Name
+ Namespace string
+}
+
+type ServiceAttributes struct {
+ Labels map[string]string
+ LabelSelectors map[string]string
+ ExportTo sets.Set[visibility.Instance]
+ ClusterExternalAddresses *AddressMap
+ ClusterExternalPorts map[cluster.ID]map[uint32]uint32
+ Aliases []NamespacedHostname
+ PassthroughTargetPorts map[uint32]uint32
+ // Name is "destination.service.name" attribute
+ Name string
+ // Namespace is "destination.service.namespace" attribute
+ Namespace string
+ ServiceRegistry provider.ID
+}
+
+type AddressMap struct {
+ Addresses map[cluster.ID][]string
+
+ // NOTE: The copystructure library is not able to copy unexported fields, so the mutex will not be copied.
+ mutex sync.RWMutex
+}
+
+func (m *AddressMap) DeepCopy() *AddressMap {
+ if m == nil {
+ return nil
+ }
+ return &AddressMap{
+ Addresses: m.GetAddresses(),
+ }
+}
+
+type Service struct {
+ Attributes ServiceAttributes
+ Hostname host.Name `json:"hostname"`
+ Ports PortList `json:"ports,omitempty"`
+ ServiceAccounts []string `json:"serviceAccounts,omitempty"`
+ ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`
+ CreationTime time.Time `json:"creationTime,omitempty"`
+}
+
+func (s *Service) DeepCopy() *Service {
+ // nolint: govet
+ out := *s
+ out.Attributes = s.Attributes.DeepCopy()
+ if s.Ports != nil {
+ out.Ports = make(PortList, len(s.Ports))
+ for i, port := range s.Ports {
+ if port != nil {
+ out.Ports[i] = &Port{
+ Name: port.Name,
+ Port: port.Port,
+ Protocol: port.Protocol,
+ }
+ } else {
+ out.Ports[i] = nil
+ }
+ }
+ }
+
+ out.ServiceAccounts = slices.Clone(s.ServiceAccounts)
+ out.ClusterVIPs = *s.ClusterVIPs.DeepCopy()
+ return &out
+}
+
+func (s *Service) Key() string {
+ if s == nil {
+ return ""
+ }
+
+ return s.Attributes.Namespace + "/" + string(s.Hostname)
+}
+
+type Port struct {
+ Name string `json:"name,omitempty"`
+ Port int `json:"port"`
+ Protocol protocol.Instance `json:"protocol,omitempty"`
+}
+
+type PortList []*Port
+
+func (p *Port) Equals(other *Port) bool {
+ if p == nil {
+ return other == nil
+ }
+ if other == nil {
+ return p == nil
+ }
+ return p.Name == other.Name && p.Port == other.Port && p.Protocol == other.Protocol
+}
+
+func (ports PortList) Equals(other PortList) bool {
+ return slices.EqualFunc(ports, other, func(a, b *Port) bool {
+ return a.Equals(b)
+ })
+}
+
type ServiceDiscovery interface {
+ Services() []*Service
+ GetService(hostname host.Name) *Service
+}
+
+func (s *ServiceAttributes) DeepCopy() ServiceAttributes {
+ // AddressMap contains a mutex, which is safe to copy in this case.
+ // nolint: govet
+ out := *s
+
+ out.Labels = maps.Clone(s.Labels)
+ if s.ExportTo != nil {
+ out.ExportTo = s.ExportTo.Copy()
+ }
+
+ out.LabelSelectors = maps.Clone(s.LabelSelectors)
+ out.ClusterExternalAddresses = s.ClusterExternalAddresses.DeepCopy()
+
+ if s.ClusterExternalPorts != nil {
+ out.ClusterExternalPorts = make(map[cluster.ID]map[uint32]uint32, len(s.ClusterExternalPorts))
+ for k, m := range s.ClusterExternalPorts {
+ out.ClusterExternalPorts[k] = maps.Clone(m)
+ }
+ }
+
+ out.Aliases = slices.Clone(s.Aliases)
+ out.PassthroughTargetPorts = maps.Clone(out.PassthroughTargetPorts)
+
+ // AddressMap contains a mutex, which is safe to return a copy in this case.
+ // nolint: govet
+ return out
}
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go b/sail/pkg/serviceregistry/aggregate/controller.go
index 0b22075..2179a5b 100644
--- a/sail/pkg/serviceregistry/aggregate/controller.go
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -2,12 +2,20 @@
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"k8s.io/klog/v2"
"sync"
)
+var (
+ _ model.ServiceDiscovery = &Controller{}
+ _ model.AggregateController = &Controller{}
+)
+
type Controller struct {
registries []*registryEntry
storeLock sync.RWMutex
@@ -53,3 +61,97 @@
<-stop
klog.Info("Registry Aggregator terminated")
}
+
+func (c *Controller) HasSynced() bool {
+ for _, r := range c.GetRegistries() {
+ if !r.HasSynced() {
+ klog.V(2).Infof("registry %s is syncing", r.Cluster())
+ return false
+ }
+ }
+ return true
+}
+
+func (c *Controller) Services() []*model.Service {
+ // smap is a map of hostname (string) to service index, used to identify services that
+ // are installed in multiple clusters.
+ smap := make(map[host.Name]int)
+ index := 0
+ services := make([]*model.Service, 0)
+ // Locking Registries list while walking it to prevent inconsistent results
+ for _, r := range c.GetRegistries() {
+ svcs := r.Services()
+ if r.Provider() != provider.Kubernetes {
+ index += len(svcs)
+ services = append(services, svcs...)
+ } else {
+ for _, s := range svcs {
+ previous, ok := smap[s.Hostname]
+ if !ok {
+ // First time we see a service. The result will have a single service per hostname
+ // The first cluster will be listed first, so the services in the primary cluster
+ // will be used for default settings. If a service appears in multiple clusters,
+ // the order is less clear.
+ smap[s.Hostname] = index
+ index++
+ services = append(services, s)
+ } else {
+ // We must deepcopy before merge, and after merging, the ClusterVips length will be >= 2.
+ // This is an optimization to prevent deepcopy multi-times
+ if services[previous].ClusterVIPs.Len() < 2 {
+ // Deep copy before merging, otherwise there is a case
+ // a service in remote cluster can be deleted, but the ClusterIP left.
+ services[previous] = services[previous].DeepCopy()
+ }
+ // If it is seen second time, that means it is from a different cluster, update cluster VIPs.
+ mergeService(services[previous], s, r)
+ }
+ }
+ }
+ }
+ return services
+}
+
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+ var out *model.Service
+ for _, r := range c.GetRegistries() {
+ service := r.GetService(hostname)
+ if service == nil {
+ continue
+ }
+ if r.Provider() != provider.Kubernetes {
+ return service
+ }
+ if out == nil {
+ out = service.DeepCopy()
+ } else {
+ // If we are seeing the service for the second time, it means it is available in multiple clusters.
+ mergeService(out, service, r)
+ }
+ }
+ return out
+}
+
+func (c *Controller) GetRegistries() []serviceregistry.Instance {
+ c.storeLock.RLock()
+ defer c.storeLock.RUnlock()
+
+ // copy registries to prevent race, no need to deep copy here.
+ out := make([]serviceregistry.Instance, len(c.registries))
+ for i := range c.registries {
+ out[i] = c.registries[i]
+ }
+ return out
+}
+
+func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance) {
+ if !src.Ports.Equals(dst.Ports) {
+ klog.V(2).Infof("service %s defined from cluster %s is different from others", src.Hostname, srcRegistry.Cluster())
+ }
+ // Prefer the k8s HostVIPs where possible
+ clusterID := srcRegistry.Cluster()
+ if len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 {
+ newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
+ dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
+ }
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go b/sail/pkg/serviceregistry/kube/controller/controller.go
index dac2cf3..893ec60 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -19,26 +19,104 @@
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/queue"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "go.uber.org/atomic"
+ "k8s.io/klog/v2"
+ "sort"
+ "sync"
+ "time"
)
+var (
+ _ serviceregistry.Instance = &Controller{}
+)
+
+type Controller struct {
+ opts Options
+ client kubelib.Client
+ sync.RWMutex
+ servicesMap map[host.Name]*model.Service
+ queue queue.Instance
+ initialSyncTimedout *atomic.Bool
+}
+
type Options struct {
- KubernetesAPIQPS float32
- KubernetesAPIBurst int
- DomainSuffix string
- // XDSUpdater will push changes to the xDS server.
- XDSUpdater model.XDSUpdater
- // MeshNetworksWatcher observes changes to the mesh networks config.
- MeshNetworksWatcher mesh.NetworksWatcher
- // MeshWatcher observes changes to the mesh config
+ KubernetesAPIQPS float32
+ KubernetesAPIBurst int
+ DomainSuffix string
+ XDSUpdater model.XDSUpdater
+ MeshNetworksWatcher mesh.NetworksWatcher
MeshWatcher meshwatcher.WatcherCollection
ClusterID cluster.ID
ClusterAliases map[string]string
SystemNamespace string
MeshServiceController *aggregate.Controller
KrtDebugger *krt.DebugHandler
+ SyncTimeout time.Duration
+}
+
+func (c *Controller) Services() []*model.Service {
+ c.RLock()
+ out := make([]*model.Service, 0, len(c.servicesMap))
+ for _, svc := range c.servicesMap {
+ out = append(out, svc)
+ }
+ c.RUnlock()
+ sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
+ return out
+}
+
+// GetService implements a service catalog operation by hostname specified.
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+ c.RLock()
+ svc := c.servicesMap[hostname]
+ c.RUnlock()
+ return svc
+}
+
+func (c *Controller) Provider() provider.ID {
+ return provider.Kubernetes
+}
+
+func (c *Controller) Cluster() cluster.ID {
+ return c.opts.ClusterID
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ if c.opts.SyncTimeout != 0 {
+ time.AfterFunc(c.opts.SyncTimeout, func() {
+ if !c.queue.HasSynced() {
+ klog.Warningf("kube controller for %s initial sync timed out", c.opts.ClusterID)
+ c.initialSyncTimedout.Store(true)
+ }
+ })
+ }
+ st := time.Now()
+
+ kubelib.WaitForCacheSync("kube controller", stop, c.informersSynced)
+ klog.Infof("kube controller for %s synced after %v", c.opts.ClusterID, time.Since(st))
+
+ // after the in-order sync we can start processing the queue
+ c.queue.Run(stop)
+ klog.Infof("Controller terminated")
+}
+
+func (c *Controller) HasSynced() bool {
+ if c.initialSyncTimedout.Load() {
+ return true
+ }
+ return c.queue.HasSynced()
+}
+
+func (c *Controller) informersSynced() bool {
+ return false
}
diff --git a/sail/pkg/trustbundle/trustbundle.go b/sail/pkg/trustbundle/trustbundle.go
index 202e901..33964cf 100644
--- a/sail/pkg/trustbundle/trustbundle.go
+++ b/sail/pkg/trustbundle/trustbundle.go
@@ -21,6 +21,7 @@
"crypto/x509"
"encoding/pem"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/klog/v2"
"sort"
"strings"
@@ -30,7 +31,6 @@
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/spiffe"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
meshconfig "istio.io/api/mesh/v1alpha1"
)
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 5a71bd0..5591c2e 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -18,13 +18,19 @@
package xds
import (
+ "context"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "time"
)
-type DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+type (
+ DiscoveryStream = xds.DiscoveryStream
+ DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+ DeltaDiscoveryClient = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
+)
type Connection struct {
xds.Connection
@@ -36,6 +42,11 @@
ids []string
}
+type Event struct {
+ pushRequest *model.PushRequest
+ done func()
+}
+
func (conn *Connection) XdsConnection() *xds.Connection {
return &conn.Connection
}
@@ -43,3 +54,51 @@
func (conn *Connection) Proxy() *model.Proxy {
return conn.proxy
}
+
+func (s *DiscoveryServer) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
+ return s.StreamDeltas(stream)
+}
+
+func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {
+ return s.Stream(stream)
+}
+
+func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, error) {
+ return nil, nil
+}
+
+func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, identities []string) error {
+ return nil
+}
+
+func (s *DiscoveryServer) closeConnection(con *Connection) {
+ if con.ID() == "" {
+ return
+ }
+}
+
+func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
+ return nil
+}
+
+func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
+ if s.RequestRateLimit.Limit() == 0 {
+ // Allow opt out when rate limiting is set to 0qps
+ return nil
+ }
+ // Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another
+ // instance in best case, or retry with backoff.
+ wait, cancel := context.WithTimeout(ctx, time.Second)
+ defer cancel()
+ return s.RequestRateLimit.Wait(wait)
+}
+
+func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
+ return &Connection{
+ Connection: xds.NewConnection(peerAddr, stream),
+ }
+}
+
+func (conn *Connection) watchedResourcesByOrder() []*model.WatchedResource {
+ return nil
+}
diff --git a/sail/pkg/xds/auth.go b/sail/pkg/xds/auth.go
new file mode 100644
index 0000000..02780d1
--- /dev/null
+++ b/sail/pkg/xds/auth.go
@@ -0,0 +1,9 @@
+package xds
+
+import (
+ "context"
+)
+
+func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
+ return nil, nil
+}
diff --git a/sail/pkg/xds/delta.go b/sail/pkg/xds/delta.go
new file mode 100644
index 0000000..82064a6
--- /dev/null
+++ b/sail/pkg/xds/delta.go
@@ -0,0 +1,198 @@
+package xds
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/xds"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+ discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+ "k8s.io/klog/v2"
+)
+
+func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
+ ctx := stream.Context()
+ peerAddr := "0.0.0.0"
+ if peerInfo, ok := peer.FromContext(ctx); ok {
+ peerAddr = peerInfo.Addr.String()
+ }
+
+ if err := s.WaitForRequestLimit(stream.Context()); err != nil {
+ klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
+ return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
+ }
+
+ ids, err := s.authenticate(ctx)
+ if err != nil {
+ return status.Error(codes.Unauthenticated, err.Error())
+ }
+ if ids != nil {
+ klog.V(2).Infof("Authenticated XDS: %v with identity %v", peerAddr, ids)
+ } else {
+ klog.V(2).Infof("Unauthenticated XDS: %v", peerAddr)
+ }
+
+ // InitContext returns immediately if the context was already initialized.
+ s.globalPushContext().InitContext(s.Env, nil, nil)
+ con := newDeltaConnection(peerAddr, stream)
+
+ // Do not call: defer close(con.pushChannel). The push channel will be garbage collected
+ // when the connection is no longer used. Closing the channel can cause subtle race conditions
+ // with push. According to the spec: "It's only necessary to close a channel when it is important
+ // to tell the receiving goroutines that all data have been sent."
+
+ // Block until either a request is received or a push is triggered.
+ // We need 2 go routines because 'read' blocks in Recv().
+ go s.receiveDelta(con, ids)
+
+ // Wait for the proxy to be fully initialized before we start serving traffic. Because
+ // initialization doesn't have dependencies that will block, there is no need to add any timeout
+ // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
+ // reqChannel and the connection not being enqueued for pushes to pushChannel until the
+ // initialization is complete.
+ <-con.InitializedCh()
+
+ for {
+ // Go select{} statements are not ordered; the same channel can be chosen many times.
+ // For requests, these are higher priority (client may be blocked on startup until these are done)
+ // and often very cheap to handle (simple ACK), so we check it first.
+ select {
+ case req, ok := <-con.deltaReqChan:
+ if ok {
+ if err := s.processDeltaRequest(req, con); err != nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error processing the request.
+ return <-con.ErrorCh()
+ }
+ case <-con.StopCh():
+ return nil
+ default:
+ }
+ // If there wasn't already a request, poll for requests and pushes. Note: if we have a huge
+ // amount of incoming requests, we may still send some pushes, as we do not `continue` above;
+ // however, requests will be handled ~2x as much as pushes. This ensures a wave of requests
+ // cannot completely starve pushes. However, this scenario is unlikely.
+ select {
+ case req, ok := <-con.deltaReqChan:
+ if ok {
+ if err := s.processDeltaRequest(req, con); err != nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error processing the request.
+ return <-con.ErrorCh()
+ }
+ case ev := <-con.PushCh():
+ pushEv := ev.(*Event)
+ err := s.pushConnectionDelta(con, pushEv)
+ pushEv.done()
+ if err != nil {
+ return err
+ }
+ case <-con.StopCh():
+ return nil
+ }
+ }
+}
+
+func (s *DiscoveryServer) receiveDelta(con *Connection, identities []string) {
+ defer func() {
+ close(con.deltaReqChan)
+ close(con.ErrorCh())
+ // Close the initialized channel, if its not already closed, to prevent blocking the stream
+ select {
+ case <-con.InitializedCh():
+ default:
+ close(con.InitializedCh())
+ }
+ }()
+ firstRequest := true
+ for {
+ req, err := con.deltaStream.Recv()
+ if err != nil {
+ if dubbogrpc.GRPCErrorType(err) != dubbogrpc.UnexpectedError {
+ klog.Infof("ADS: %q %s terminated", con.Peer(), con.ID())
+ return
+ }
+ con.ErrorCh() <- err
+ klog.Errorf("ADS: %q %s terminated with error: %v", con.Peer(), con.ID(), err)
+ return
+ }
+ // This should be only set for the first request. The node id may not be set - for example malicious clients.
+ if firstRequest {
+ // probe happens before envoy sends first xDS request
+ if req.TypeUrl == v3.HealthInfoType {
+ klog.Warningf("ADS: %q %s send health check probe before normal xDS request", con.Peer(), con.ID())
+ continue
+ }
+ firstRequest = false
+ if req.Node == nil || req.Node.Id == "" {
+ con.ErrorCh() <- status.New(codes.InvalidArgument, "missing node information").Err()
+ return
+ }
+ if err := s.initConnection(req.Node, con, identities); err != nil {
+ con.ErrorCh() <- err
+ return
+ }
+ defer s.closeConnection(con)
+ klog.Infof("ADS: new delta connection for node:%s", con.ID())
+ }
+
+ select {
+ case con.deltaReqChan <- req:
+ case <-con.deltaStream.Context().Done():
+ klog.Infof("ADS: %q %s terminated with stream closed", con.Peer(), con.ID())
+ return
+ }
+ }
+}
+
+func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) error {
+ pushRequest := pushEv.pushRequest
+
+ if pushRequest.Full {
+ // Update Proxy with current information.
+ s.computeProxyState(con.proxy, pushRequest)
+ }
+
+ pushRequest, needsPush := s.ProxyNeedsPush(con.proxy, pushRequest)
+ if !needsPush {
+ klog.V(2).Infof("Skipping push to %v, no updates required", con.ID())
+ return nil
+ }
+
+ // Send pushes to all generators
+ // Each Generator is responsible for determining if the push event requires a push
+ wrl := con.watchedResourcesByOrder()
+ for _, w := range wrl {
+ if err := s.pushDeltaXds(con, w, pushRequest); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryRequest, con *Connection) error {
+ return nil
+}
+
+func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest) {
+ return
+}
+
+func (s *DiscoveryServer) pushDeltaXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
+ return nil
+}
+
+func newDeltaConnection(peerAddr string, stream DeltaDiscoveryStream) *Connection {
+ return &Connection{
+ Connection: xds.NewConnection(peerAddr, nil),
+ deltaStream: stream,
+ deltaReqChan: make(chan *discovery.DeltaDiscoveryRequest, 1),
+ }
+}
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 9b51f9e..a4c58ed 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -21,7 +21,10 @@
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"go.uber.org/atomic"
+ "golang.org/x/time/rate"
+ "google.golang.org/grpc"
"k8s.io/klog/v2"
"time"
)
@@ -36,6 +39,8 @@
krtDebugger *krt.DebugHandler
InboundUpdates *atomic.Int64
CommittedUpdates *atomic.Int64
+ RequestRateLimit *rate.Limiter
+ ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) (*model.PushRequest, bool)
}
func NewDiscoveryServer(env *model.Environment, clusterAliases map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
@@ -51,6 +56,11 @@
return out
}
+func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
+ // Register v3 server
+ discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
+}
+
func (s *DiscoveryServer) CachesSynced() {
klog.Infof("All caches have been synced up in %v, marking server ready", time.Since(s.DiscoveryStartTime))
s.serverReady.Store(true)
@@ -59,3 +69,7 @@
func (s *DiscoveryServer) Shutdown() {
s.pushQueue.ShutDown()
}
+
+func (s *DiscoveryServer) globalPushContext() *model.PushContext {
+ return s.Env.PushContext()
+}
diff --git a/sail/pkg/xds/v3/model.go b/sail/pkg/xds/v3/model.go
index 790de20..38aad5f 100644
--- a/sail/pkg/xds/v3/model.go
+++ b/sail/pkg/xds/v3/model.go
@@ -3,9 +3,14 @@
import "github.com/apache/dubbo-kubernetes/pkg/model"
const (
- ClusterType = model.ClusterType
- ListenerType = model.ListenerType
- EndpointType = model.EndpointType
- RouteType = model.RouteType
- DebugType = model.DebugType
+ ClusterType = model.ClusterType
+ ListenerType = model.ListenerType
+ EndpointType = model.EndpointType
+ RouteType = model.RouteType
+ DebugType = model.DebugType
+ HealthInfoType = model.HealthInfoType
)
+
+func GetShortType(typeURL string) string {
+ return model.GetShortType(typeURL)
+}
diff --git a/security/pkg/pki/ra/common.go b/security/pkg/pki/ra/common.go
index 3ce5b20..41e6da3 100644
--- a/security/pkg/pki/ra/common.go
+++ b/security/pkg/pki/ra/common.go
@@ -22,7 +22,7 @@
"crypto/x509"
"encoding/asn1"
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
raerror "github.com/apache/dubbo-kubernetes/security/pkg/pki/error"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/util"