[discovery] add aggregate and cert controller logic (#792)

diff --git a/dubboctl/pkg/validate/validate.go b/dubboctl/pkg/validate/validate.go
index dbcc470..debd997 100644
--- a/dubboctl/pkg/validate/validate.go
+++ b/dubboctl/pkg/validate/validate.go
@@ -25,7 +25,7 @@
 	operator "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
 	operatorvalidate "github.com/apache/dubbo-kubernetes/operator/pkg/apis/validation"
 	"github.com/apache/dubbo-kubernetes/pkg/config/validation"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/hashicorp/go-multierror"
 	"github.com/spf13/cobra"
 	"io"
diff --git a/go.mod b/go.mod
index 748a3e5..2a97428 100644
--- a/go.mod
+++ b/go.mod
@@ -40,7 +40,9 @@
 	github.com/golang/protobuf v1.5.4
 	github.com/google/go-cmp v0.7.0
 	github.com/google/go-containerregistry v0.20.6
+	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
 	github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
+	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 	github.com/hashicorp/go-multierror v1.1.1
 	github.com/heroku/color v0.0.6
 	github.com/moby/term v0.5.2
@@ -112,10 +114,12 @@
 	github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 // indirect
 	github.com/aws/smithy-go v1.20.2 // indirect
 	github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.0.0-20230522190001-adf1bafd791a // indirect
+	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/blang/semver/v4 v4.0.0 // indirect
 	github.com/buildpacks/imgutil v0.0.0-20230626185301-726f02e4225c // indirect
 	github.com/buildpacks/lifecycle v0.17.0 // indirect
 	github.com/cespare/xxhash v1.1.0 // indirect
+	github.com/cespare/xxhash/v2 v2.3.0 // indirect
 	github.com/chrismellard/docker-credential-acr-env v0.0.0-20230304212654-82a0ddb27589 // indirect
 	github.com/cloudflare/circl v1.3.7 // indirect
 	github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
@@ -203,6 +207,8 @@
 	github.com/pkoukk/tiktoken-go v0.1.6 // indirect
 	github.com/planetscale/vtprotobuf v0.6.1-0.20240409071808-615f978279ca // indirect
 	github.com/prometheus/client_golang v1.23.0 // indirect
+	github.com/prometheus/client_model v0.6.2 // indirect
+	github.com/prometheus/common v0.65.0 // indirect
 	github.com/prometheus/procfs v0.17.0 // indirect
 	github.com/rivo/tview v0.0.0-20220307222120-9994674d60a8 // indirect
 	github.com/rivo/uniseg v0.4.7 // indirect
diff --git a/go.sum b/go.sum
index beb2eaa..bfe1770 100644
--- a/go.sum
+++ b/go.sum
@@ -186,6 +186,7 @@
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU=
 github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
 github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
 github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
 github.com/containerd/containerd v1.7.27 h1:yFyEyojddO3MIGVER2xJLWoCIn+Up4GaHFquP7hsFII=
@@ -244,7 +245,9 @@
 github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
 github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
 github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
 github.com/envoyproxy/go-control-plane/envoy v1.32.5-0.20250627145903-197b96a9c7f8 h1:/F9jLyfDeNr4iZxyibtKlZxCDqCFEhoYiLdc9VOZT2E=
 github.com/envoyproxy/go-control-plane/envoy v1.32.5-0.20250627145903-197b96a9c7f8/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -287,6 +290,7 @@
 github.com/go-git/go-git/v5 v5.13.1/go.mod h1:qryJB4cSBoq3FRoBRf5A77joojuBcmPJ0qu3XXXVixc=
 github.com/go-jose/go-jose/v4 v4.0.5 h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE=
 github.com/go-jose/go-jose/v4 v4.0.5/go.mod h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
 github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
@@ -301,6 +305,7 @@
 github.com/go-openapi/swag v0.23.1/go.mod h1:STZs8TbRvEQQKUA+JZNAm3EWlgaOBGpyFDqQnDHMef0=
 github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
 github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
 github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
 github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
@@ -322,6 +327,7 @@
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
 github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
 github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
 github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@@ -373,8 +379,12 @@
 github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0=
 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
 github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -507,6 +517,7 @@
 github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
 github.com/opencontainers/selinux v1.11.1 h1:nHFvthhM0qY8/m+vfhJylliSshm8G1jJ2jDMcgULaH8=
 github.com/opencontainers/selinux v1.11.1/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
+github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
 github.com/ory/viper v1.7.5 h1:+xVdq7SU3e1vNaCsk/ixsfxE4zylk1TJUiJrY647jUE=
 github.com/ory/viper v1.7.5/go.mod h1:ypOuyJmEUb3oENywQZRgeAMwqgOyDqwboO1tj3DjTaM=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -563,6 +574,7 @@
 github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
 github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
 github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
 github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
@@ -667,12 +679,15 @@
 go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
 go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 h1:Ss6D3hLXTM0KobyBYEAygXzFfGcjnmfEJOBgSbemCtg=
 go.starlark.net v0.0.0-20230302034142-4b1e35fe2254/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
 go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
 go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
 go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
 go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
 go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
 go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
 go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
@@ -733,6 +748,7 @@
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -794,6 +810,7 @@
 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
 google.golang.org/genproto v0.0.0-20240528184218-531527333157 h1:u7WMYrIrVvs0TF5yaKwKNbcJyySYf+HAIFXxWltJOXE=
 google.golang.org/genproto v0.0.0-20240528184218-531527333157/go.mod h1:ubQlAQnzejB8uZzszhrTCU2Fyp6Vi7ZE5nn0c3W8+qQ=
@@ -803,7 +820,9 @@
 google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
 google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
 google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
diff --git a/operator/cmd/cluster/manifest.go b/operator/cmd/cluster/manifest.go
index 4bd44f0..df5ec60 100644
--- a/operator/cmd/cluster/manifest.go
+++ b/operator/cmd/cluster/manifest.go
@@ -25,7 +25,7 @@
 	"github.com/apache/dubbo-kubernetes/operator/pkg/render"
 	"github.com/apache/dubbo-kubernetes/operator/pkg/util/clog"
 	"github.com/apache/dubbo-kubernetes/pkg/kube"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/spf13/cobra"
 	"sigs.k8s.io/yaml"
 	"strings"
diff --git a/operator/pkg/helm/helm.go b/operator/pkg/helm/helm.go
index 60429cb..d9ab3f9 100644
--- a/operator/pkg/helm/helm.go
+++ b/operator/pkg/helm/helm.go
@@ -23,7 +23,7 @@
 	"github.com/apache/dubbo-kubernetes/operator/pkg/manifest"
 	"github.com/apache/dubbo-kubernetes/operator/pkg/util"
 	"github.com/apache/dubbo-kubernetes/operator/pkg/values"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"helm.sh/helm/v3/pkg/chart"
 	"helm.sh/helm/v3/pkg/chart/loader"
 	"helm.sh/helm/v3/pkg/chartutil"
diff --git a/operator/pkg/install/installer.go b/operator/pkg/install/installer.go
index df6450f..9b74c0e 100644
--- a/operator/pkg/install/installer.go
+++ b/operator/pkg/install/installer.go
@@ -29,8 +29,8 @@
 	"github.com/apache/dubbo-kubernetes/operator/pkg/util/progress"
 	"github.com/apache/dubbo-kubernetes/operator/pkg/values"
 	"github.com/apache/dubbo-kubernetes/pkg/kube"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"github.com/hashicorp/go-multierror"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/config/constants/constants.go b/pkg/config/constants/constants.go
index e9bf56f..d558faf 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -36,4 +36,14 @@
 	CertProviderCustom     = "custom"
 
 	ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
+
+	CertProviderKubernetes = "kubernetes"
+
+	SailWellKnownDNSCertPath   = "./var/run/secrets/dubbod/tls/"
+	SailWellKnownDNSCaCertPath = "./var/run/secrets/dubbod/ca/"
+
+	DefaultSailTLSCert                = SailWellKnownDNSCertPath + "tls.crt"
+	DefaultSailTLSKey                 = SailWellKnownDNSCertPath + "tls.key"
+	DefaultSailTLSCaCert              = SailWellKnownDNSCaCertPath + "root-cert.pem"
+	DefaultSailTLSCaCertAlternatePath = SailWellKnownDNSCertPath + "ca.crt"
 )
diff --git a/pkg/config/labels/instance.go b/pkg/config/labels/instance.go
index 845ebbe..6b8fdec 100644
--- a/pkg/config/labels/instance.go
+++ b/pkg/config/labels/instance.go
@@ -19,13 +19,13 @@
 
 import (
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"regexp"
 	"strings"
 
 	"github.com/hashicorp/go-multierror"
 
 	"github.com/apache/dubbo-kubernetes/pkg/maps"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 )
 
 const (
diff --git a/pkg/config/model.go b/pkg/config/model.go
index 76e8265..204feb7 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -21,6 +21,7 @@
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"reflect"
 	"time"
 
@@ -42,7 +43,6 @@
 	"github.com/apache/dubbo-kubernetes/pkg/util/gogoprotomarshal"
 	"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
 	"istio.io/api/label"
 )
diff --git a/pkg/config/protocol/instance.go b/pkg/config/protocol/instance.go
new file mode 100644
index 0000000..3748f91
--- /dev/null
+++ b/pkg/config/protocol/instance.go
@@ -0,0 +1,146 @@
+package protocol
+
+import "strings"
+
+// Instance defines network protocols for ports
+type Instance string
+
+func (i Instance) String() string {
+	return string(i)
+}
+
+const (
+	// GRPC declares that the port carries gRPC traffic.
+	GRPC Instance = "GRPC"
+	// GRPCWeb declares that the port carries gRPC traffic.
+	GRPCWeb Instance = "GRPC-Web"
+	// HTTP declares that the port carries HTTP/1.1 traffic.
+	// Note that HTTP/1.0 or earlier may not be supported by the proxy.
+	HTTP Instance = "HTTP"
+	// HTTP_PROXY declares that the port is a generic outbound proxy port.
+	// Note that this is currently applicable only for defining sidecar egress listeners.
+	// nolint
+	HTTP_PROXY Instance = "HTTP_PROXY"
+	// HTTP2 declares that the port carries HTTP/2 traffic.
+	HTTP2 Instance = "HTTP2"
+	// HTTPS declares that the port carries HTTPS traffic.
+	HTTPS Instance = "HTTPS"
+	// TCP declares the port uses TCP.
+	// This is the default protocol for a service port.
+	TCP Instance = "TCP"
+	// TLS declares that the port carries TLS traffic.
+	// TLS traffic is assumed to contain SNI as part of the handshake.
+	TLS Instance = "TLS"
+	// UDP declares that the port uses UDP.
+	// Note that UDP protocol is not currently supported by the proxy.
+	UDP Instance = "UDP"
+	// Unsupported - value to signify that the protocol is unsupported.
+	Unsupported Instance = "UnsupportedProtocol"
+)
+
+// Parse from string ignoring case
+func Parse(s string) Instance {
+	switch strings.ToLower(s) {
+	case "tcp":
+		return TCP
+	case "udp":
+		return UDP
+	case "grpc":
+		return GRPC
+	case "grpc-web":
+		return GRPCWeb
+	case "http":
+		return HTTP
+	case "http_proxy":
+		return HTTP_PROXY
+	case "http2":
+		return HTTP2
+	case "https":
+		return HTTPS
+	case "tls":
+		return TLS
+	}
+
+	return Unsupported
+}
+
+// IsHTTP2 is true for protocols that use HTTP/2 as transport protocol
+func (i Instance) IsHTTP2() bool {
+	switch i {
+	case HTTP2, GRPC, GRPCWeb:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsHTTPOrSniffed is true for protocols that use HTTP as transport protocol, or *can* use it if sniffed to be HTTP
+func (i Instance) IsHTTPOrSniffed() bool {
+	return i.IsHTTP() || i.IsUnsupported()
+}
+
+// IsHTTP is true for protocols that use HTTP as transport protocol
+func (i Instance) IsHTTP() bool {
+	switch i {
+	case HTTP, HTTP2, HTTP_PROXY, GRPC, GRPCWeb:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsTCP is true for protocols that use TCP as transport protocol
+func (i Instance) IsTCP() bool {
+	switch i {
+	case TCP, HTTPS, TLS:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsTLS is true for protocols on top of TLS (e.g. HTTPS)
+func (i Instance) IsTLS() bool {
+	switch i {
+	case HTTPS, TLS:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsHTTPS is true if protocol is HTTPS
+func (i Instance) IsHTTPS() bool {
+	switch i {
+	case HTTPS:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsGRPC is true for GRPC protocols.
+func (i Instance) IsGRPC() bool {
+	switch i {
+	case GRPC, GRPCWeb:
+		return true
+	default:
+		return false
+	}
+}
+
+func (i Instance) IsUnsupported() bool {
+	return i == Unsupported
+}
+
+// AfterTLSTermination returns the protocol that will be used if TLS is terminated on the current protocol.
+func (i Instance) AfterTLSTermination() Instance {
+	switch i {
+	case HTTPS:
+		return HTTP
+	case TLS:
+		return TCP
+	default:
+		return i
+	}
+}
diff --git a/pkg/config/schema/collection/schemas.go b/pkg/config/schema/collection/schemas.go
index bbf9e61..d5cadbf 100644
--- a/pkg/config/schema/collection/schemas.go
+++ b/pkg/config/schema/collection/schemas.go
@@ -2,6 +2,7 @@
 
 import (
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 
 	"github.com/google/go-cmp/cmp"
 	"github.com/hashicorp/go-multierror"
@@ -10,7 +11,6 @@
 	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 )
 
 // Schemas contains metadata about configuration resources.
diff --git a/pkg/config/schema/gvk/resources.gen.go b/pkg/config/schema/gvk/resources.go
similarity index 83%
rename from pkg/config/schema/gvk/resources.gen.go
rename to pkg/config/schema/gvk/resources.go
index 08d09b9..1e5d9e4 100644
--- a/pkg/config/schema/gvk/resources.gen.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -18,7 +18,7 @@
 package gvk
 
 import (
-	"github.com/apache/dubbo-kubernetes/operator/pkg/config"
+	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
 	"k8s.io/apimachinery/pkg/runtime/schema"
 )
@@ -37,6 +37,9 @@
 	Service                        = config.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}
 	ServiceAccount                 = config.GroupVersionKind{Group: "", Version: "v1", Kind: "ServiceAccount"}
 	MeshConfig                     = config.GroupVersionKind{Group: "", Version: "v1alpha1", Kind: "MeshConfig"}
+	RequestAuthentication          = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
+	PeerAuthentication             = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "PeerAuthentication"}
+	AuthorizationPolicy            = config.GroupVersionKind{Group: "security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
 )
 
 func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -67,6 +70,12 @@
 		return gvr.Job, true
 	case MeshConfig:
 		return gvr.MeshConfig, true
+	case RequestAuthentication:
+		return gvr.RequestAuthentication, true
+	case PeerAuthentication:
+		return gvr.PeerAuthentication, true
+	case AuthorizationPolicy:
+		return gvr.AuthorizationPolicy, true
 	}
 	return schema.GroupVersionResource{}, false
 }
@@ -92,6 +101,10 @@
 		return DaemonSet, true
 	case gvr.Job:
 		return Job, true
+	case gvr.PeerAuthentication:
+		return PeerAuthentication, true
+	case gvr.RequestAuthentication:
+		return RequestAuthentication, true
 	}
 	return config.GroupVersionKind{}, false
 }
diff --git a/pkg/config/schema/gvr/resource.gen.go b/pkg/config/schema/gvr/resources.go
similarity index 83%
rename from pkg/config/schema/gvr/resource.gen.go
rename to pkg/config/schema/gvr/resources.go
index 1fd1e2e..a257a6a 100644
--- a/pkg/config/schema/gvr/resource.gen.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -35,6 +35,9 @@
 	Service                        = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
 	ServiceAccount                 = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "serviceaccounts"}
 	MeshConfig                     = schema.GroupVersionResource{Group: "", Version: "v1alpha1", Resource: "meshconfigs"}
+	RequestAuthentication          = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "requestauthentications"}
+	PeerAuthentication             = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "peerauthentications"}
+	AuthorizationPolicy            = schema.GroupVersionResource{Group: "security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
 )
 
 func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -55,6 +58,12 @@
 		return false
 	case ServiceAccount:
 		return false
+	case RequestAuthentication:
+		return false
+	case PeerAuthentication:
+		return false
+	case AuthorizationPolicy:
+		return false
 	}
 	return false
 }
diff --git a/pkg/config/schema/kind/kind.go b/pkg/config/schema/kind/kind.go
new file mode 100644
index 0000000..6c29e6e
--- /dev/null
+++ b/pkg/config/schema/kind/kind.go
@@ -0,0 +1,3 @@
+package kind
+
+type Kind uint8
diff --git a/pkg/config/schema/kind/resource.go b/pkg/config/schema/kind/resource.go
new file mode 100644
index 0000000..fdbb42a
--- /dev/null
+++ b/pkg/config/schema/kind/resource.go
@@ -0,0 +1,60 @@
+package kind
+
+const (
+	Unknown Kind = iota
+	AuthorizationPolicy
+	CustomResourceDefinition
+	DestinationRule
+	MeshConfig
+	MeshNetworks
+	MutatingWebhookConfiguration
+	Namespace
+	PeerAuthentication
+	Pod
+	RequestAuthentication
+	Secret
+	Service
+	ServiceAccount
+	StatefulSet
+	ValidatingWebhookConfiguration
+	VirtualService
+)
+
+func (k Kind) String() string {
+	switch k {
+	case AuthorizationPolicy:
+		return "AuthorizationPolicy"
+	case CustomResourceDefinition:
+		return "CustomResourceDefinition"
+	case DestinationRule:
+		return "DestinationRule"
+	case MeshConfig:
+		return "MeshConfig"
+	case MeshNetworks:
+		return "MeshNetworks"
+	case MutatingWebhookConfiguration:
+		return "MutatingWebhookConfiguration"
+	case Namespace:
+		return "Namespace"
+	case PeerAuthentication:
+		return "PeerAuthentication"
+	case Pod:
+		return "Pod"
+	case RequestAuthentication:
+		return "RequestAuthentication"
+	case Secret:
+		return "Secret"
+	case Service:
+		return "Service"
+	case ServiceAccount:
+		return "ServiceAccount"
+	case StatefulSet:
+		return "StatefulSet"
+	case ValidatingWebhookConfiguration:
+		return "ValidatingWebhookConfiguration"
+	case VirtualService:
+		return "VirtualService"
+	default:
+		return "Unknown"
+	}
+}
diff --git a/pkg/config/schema/kubetypes/common.go b/pkg/config/schema/kubetypes/common.go
index e091b71..bff043c 100644
--- a/pkg/config/schema/kubetypes/common.go
+++ b/pkg/config/schema/kubetypes/common.go
@@ -18,7 +18,7 @@
 package kubetypes
 
 import (
-	"github.com/apache/dubbo-kubernetes/operator/pkg/config"
+	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
 	"github.com/apache/dubbo-kubernetes/pkg/typemap"
diff --git a/pkg/config/schema/kubetypes/resources.gen.go b/pkg/config/schema/kubetypes/resources.go
similarity index 95%
rename from pkg/config/schema/kubetypes/resources.gen.go
rename to pkg/config/schema/kubetypes/resources.go
index 0eb976b..7090e19 100644
--- a/pkg/config/schema/kubetypes/resources.gen.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -18,7 +18,7 @@
 package kubetypes
 
 import (
-	"github.com/apache/dubbo-kubernetes/operator/pkg/config"
+	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
 	istioioapimeshv1alpha1 "istio.io/api/mesh/v1alpha1"
 	k8sioapicorev1 "k8s.io/api/core/v1"
diff --git a/pkg/config/visibility/visibility.go b/pkg/config/visibility/visibility.go
new file mode 100644
index 0000000..49fe4f6
--- /dev/null
+++ b/pkg/config/visibility/visibility.go
@@ -0,0 +1,28 @@
+package visibility
+
+import (
+	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/config/labels"
+)
+
+type Instance string
+
+const (
+	Private Instance = "."
+	Public  Instance = "*"
+	None    Instance = "~"
+)
+
+func (v Instance) Validate() (errs error) {
+	switch v {
+	case Private, Public:
+		return nil
+	case None:
+		return fmt.Errorf("exportTo ~ (none) is not allowed for Istio configuration objects")
+	default:
+		if !labels.IsDNS1123Label(string(v)) {
+			return fmt.Errorf("only .,*, or a valid DNS 1123 label is allowed as exportTo entry")
+		}
+	}
+	return nil
+}
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index 8ee7595..2a340d7 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -28,8 +28,8 @@
 	"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/features"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/kube/krt/collection.go b/pkg/kube/krt/collection.go
index de44a06..be30cce 100644
--- a/pkg/kube/krt/collection.go
+++ b/pkg/kube/krt/collection.go
@@ -23,8 +23,8 @@
 	"github.com/apache/dubbo-kubernetes/pkg/maps"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
 	"github.com/apache/dubbo-kubernetes/pkg/queue"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"k8s.io/klog/v2"
 	"sync"
 )
diff --git a/pkg/kube/krt/fetch.go b/pkg/kube/krt/fetch.go
index 933c991..b2eaf6b 100644
--- a/pkg/kube/krt/fetch.go
+++ b/pkg/kube/krt/fetch.go
@@ -18,7 +18,7 @@
 package krt
 
 import (
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 )
 
 func FetchOne[T any](ctx HandlerContext, c Collection[T], opts ...FetchOption) *T {
diff --git a/pkg/kube/krt/files/files.go b/pkg/kube/krt/files/files.go
index 9f4e6d3..743b8ce 100644
--- a/pkg/kube/krt/files/files.go
+++ b/pkg/kube/krt/files/files.go
@@ -21,8 +21,8 @@
 	"fmt"
 	"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"github.com/fsnotify/fsnotify"
 	"go.uber.org/atomic"
 	"k8s.io/klog/v2"
diff --git a/pkg/kube/krt/index.go b/pkg/kube/krt/index.go
index e94b355..96788c0 100644
--- a/pkg/kube/krt/index.go
+++ b/pkg/kube/krt/index.go
@@ -21,8 +21,8 @@
 	"fmt"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"k8s.io/client-go/tools/cache"
 )
 
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 2dd0526..f7d2647 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -23,7 +23,7 @@
 	"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	klabels "k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/tools/cache"
diff --git a/pkg/kube/krt/processor.go b/pkg/kube/krt/processor.go
index 636443f..e4d0fac 100644
--- a/pkg/kube/krt/processor.go
+++ b/pkg/kube/krt/processor.go
@@ -18,7 +18,7 @@
 package krt
 
 import (
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"sync"
 	"sync/atomic"
 
diff --git a/pkg/kube/krt/singleton.go b/pkg/kube/krt/singleton.go
index 758ef3f..05186de 100644
--- a/pkg/kube/krt/singleton.go
+++ b/pkg/kube/krt/singleton.go
@@ -21,8 +21,7 @@
 	"fmt"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
-
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"sync/atomic"
 )
 
diff --git a/pkg/kube/krt/static.go b/pkg/kube/krt/static.go
index a0b386b..8698c85 100644
--- a/pkg/kube/krt/static.go
+++ b/pkg/kube/krt/static.go
@@ -5,8 +5,8 @@
 	"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
 	"github.com/apache/dubbo-kubernetes/pkg/maps"
 	"github.com/apache/dubbo-kubernetes/pkg/ptr"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"sync"
 )
 
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index e9baa49..7c7e623 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -19,6 +19,7 @@
 
 import (
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"k8s.io/klog/v2"
 	"sync"
 
@@ -33,7 +34,6 @@
 	"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	meshapi "istio.io/api/mesh/v1alpha1"
 )
 
diff --git a/pkg/network/id.go b/pkg/network/id.go
new file mode 100644
index 0000000..23291ab
--- /dev/null
+++ b/pkg/network/id.go
@@ -0,0 +1,14 @@
+package network
+
+import "github.com/apache/dubbo-kubernetes/pkg/util/identifier"
+
+// ID is the unique identifier for a network.
+type ID string
+
+func (id ID) Equals(other ID) bool {
+	return identifier.IsSameOrEmpty(string(id), string(other))
+}
+
+func (id ID) String() string {
+	return string(id)
+}
diff --git a/pkg/util/slices/slices.go b/pkg/slices/slices.go
similarity index 100%
rename from pkg/util/slices/slices.go
rename to pkg/slices/slices.go
diff --git a/pkg/util/sets/set.go b/pkg/util/sets/set.go
index ca63142..d9b0ddd 100644
--- a/pkg/util/sets/set.go
+++ b/pkg/util/sets/set.go
@@ -19,16 +19,13 @@
 
 import (
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 )
 
 import (
 	"golang.org/x/exp/constraints"
 )
 
-import (
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
-)
-
 type Set[T comparable] map[T]struct{}
 
 type String = Set[string]
diff --git a/pkg/util/smallset/smallset.go b/pkg/util/smallset/smallset.go
index 9e84cdc..753762e 100644
--- a/pkg/util/smallset/smallset.go
+++ b/pkg/util/smallset/smallset.go
@@ -20,7 +20,7 @@
 import (
 	"cmp"
 	"fmt"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 )
 
 // Set is an immutable set optimized for *small number of items*. For general purposes, Sets is likely better
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 183f05a..814630d 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -360,3 +360,15 @@
 func (conn *Connection) StreamDone() <-chan struct{} {
 	return conn.stream.Context().Done()
 }
+
+func (conn *Connection) InitializedCh() chan struct{} {
+	return conn.initialized
+}
+
+func (conn *Connection) ErrorCh() chan error {
+	return conn.errorChan
+}
+
+func (conn *Connection) StopCh() chan struct{} {
+	return conn.stop
+}
diff --git a/sail/pkg/bootstrap/certcontroller.go b/sail/pkg/bootstrap/certcontroller.go
index b0d2869..0e61cf3 100644
--- a/sail/pkg/bootstrap/certcontroller.go
+++ b/sail/pkg/bootstrap/certcontroller.go
@@ -18,13 +18,26 @@
 package bootstrap
 
 import (
+	"bytes"
+	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/config/constants"
+	"github.com/apache/dubbo-kubernetes/pkg/sleep"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/features"
 	tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
+	"github.com/apache/dubbo-kubernetes/security/pkg/k8s/chiron"
+	"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
+	certutil "github.com/apache/dubbo-kubernetes/security/pkg/util"
 	"k8s.io/klog/v2"
+	"os"
+	"path"
+	"strings"
+	"time"
 )
 
 const (
-	defaultCACertPath = "./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+	defaultCertGracePeriodRatio = 0.5
+	rootCertPollingInterval     = 60 * time.Second
+	defaultCACertPath           = "./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
 )
 
 func (s *Server) updateRootCertAndGenKeyCert() error {
@@ -51,3 +64,189 @@
 	s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
 	return nil
 }
+
+func (s *Server) initFileCertificateWatches(tlsOptions TLSOptions) error {
+	if err := s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile, tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+		return fmt.Errorf("set keyCertBundle failed: %v", err)
+	}
+	// TODO: Setup watcher for root and restart server if it changes.
+	for _, file := range []string{tlsOptions.CertFile, tlsOptions.KeyFile} {
+		klog.Infof("adding watcher for certificate %s", file)
+		if err := s.fileWatcher.Add(file); err != nil {
+			return fmt.Errorf("could not watch %v: %v", file, err)
+		}
+	}
+	s.addStartFunc("certificate rotation", func(stop <-chan struct{}) error {
+		go func() {
+			var keyCertTimerC <-chan time.Time
+			for {
+				select {
+				case <-keyCertTimerC:
+					keyCertTimerC = nil
+					if err := s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile, tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+						klog.Errorf("Setting keyCertBundle failed: %v", err)
+					}
+				case <-s.fileWatcher.Events(tlsOptions.CertFile):
+					if keyCertTimerC == nil {
+						keyCertTimerC = time.After(watchDebounceDelay)
+					}
+				case <-s.fileWatcher.Events(tlsOptions.KeyFile):
+					if keyCertTimerC == nil {
+						keyCertTimerC = time.After(watchDebounceDelay)
+					}
+				case err := <-s.fileWatcher.Errors(tlsOptions.CertFile):
+					klog.Errorf("error watching %v: %v", tlsOptions.CertFile, err)
+				case err := <-s.fileWatcher.Errors(tlsOptions.KeyFile):
+					klog.Errorf("error watching %v: %v", tlsOptions.KeyFile, err)
+				case <-stop:
+					return
+				}
+			}
+		}()
+		return nil
+	})
+	return nil
+}
+
+func (s *Server) RotateDNSCertForK8sCA(stop <-chan struct{},
+	defaultCACertPath string,
+	signerName string,
+	approveCsr bool,
+	requestedLifetime time.Duration,
+) {
+	certUtil := certutil.NewCertUtil(int(defaultCertGracePeriodRatio * 100))
+	for {
+		waitTime, _ := certUtil.GetWaitTime(s.dubbodCertBundleWatcher.GetKeyCertBundle().CertPem, time.Now())
+		if !sleep.Until(stop, waitTime) {
+			return
+		}
+		certChain, keyPEM, _, err := chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+			strings.Join(s.dnsNames, ","), defaultCACertPath, signerName, approveCsr, requestedLifetime)
+		if err != nil {
+			klog.Errorf("failed regenerating key and cert for dubbod by kubernetes: %v", err)
+			continue
+		}
+		s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, s.dubbodCertBundleWatcher.GetCABundle())
+	}
+}
+func (s *Server) initDNSCertsK8SRA() error {
+	var certChain, keyPEM, caBundle []byte
+	var err error
+	pilotCertProviderName := features.SailCertProvider
+
+	signerName := strings.TrimPrefix(pilotCertProviderName, constants.CertProviderKubernetesSignerPrefix)
+	klog.Infof("Generating K8S-signed cert for %v using signer %v", s.dnsNames, signerName)
+	certChain, keyPEM, _, err = chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+		strings.Join(s.dnsNames, ","), "", signerName, true, SelfSignedCACertTTL.Get())
+	if err != nil {
+		return fmt.Errorf("failed generating key and cert by kubernetes: %v", err)
+	}
+	caBundle, err = s.RA.GetRootCertFromMeshConfig(signerName)
+	if err != nil {
+		return err
+	}
+
+	// MeshConfig:Add callback for mesh config update
+	s.environment.AddMeshHandler(func() {
+		newCaBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
+		if newCaBundle != nil && !bytes.Equal(newCaBundle, s.dubbodCertBundleWatcher.GetKeyCertBundle().CABundle) {
+			newCertChain, newKeyPEM, _, err := chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+				strings.Join(s.dnsNames, ","), "", signerName, true, SelfSignedCACertTTL.Get())
+			if err != nil {
+				klog.Errorf("failed regenerating key and cert for istiod by kubernetes: %v", err)
+			}
+			s.dubbodCertBundleWatcher.SetAndNotify(newKeyPEM, newCertChain, newCaBundle)
+		}
+	})
+
+	s.addStartFunc("istiod server certificate rotation", func(stop <-chan struct{}) error {
+		go func() {
+			// Track TTL of DNS cert and renew cert in accordance to grace period.
+			s.RotateDNSCertForK8sCA(stop, "", signerName, true, SelfSignedCACertTTL.Get())
+		}()
+		return nil
+	})
+	s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+	return nil
+}
+
+func (s *Server) initDNSCertsDubbod() error {
+	var certChain, keyPEM, caBundle []byte
+	var err error
+	// Generate certificates for Dubbod DNS names, signed by Dubbod CA
+	certChain, keyPEM, err = s.CA.GenKeyCert(s.dnsNames, SelfSignedCACertTTL.Get(), false)
+	if err != nil {
+		return fmt.Errorf("failed generating dubbod key cert %v", err)
+	}
+	klog.Infof("Generating dubbod-signed cert for %v:\n %s", s.dnsNames, certChain)
+
+	fileBundle, err := detectSigningCABundleAndCRL()
+	if err != nil {
+		return fmt.Errorf("unable to determine signing file format %v", err)
+	}
+
+	dubboGenerated, detectedSigningCABundle := false, false
+	if _, err := os.Stat(fileBundle.SigningKeyFile); err == nil {
+		detectedSigningCABundle = true
+		if _, err := os.Stat(path.Join(LocalCertDir.Get(), ca.DubboGenerated)); err == nil {
+			dubboGenerated = true
+		}
+	}
+
+	// check if signing key file exists the cert dir and if the dubbo-generated file
+	// exists (only if USE_CACERTS_FOR_SELF_SIGNED_CA is enabled)
+	if !detectedSigningCABundle {
+		klog.Infof("Use roots from dubbo-ca-secret")
+
+		caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+		s.addStartFunc("dubbod server certificate rotation", func(stop <-chan struct{}) error {
+			go func() {
+				// regenerate dubbod key cert when root cert changes.
+				s.watchRootCertAndGenKeyCert(stop)
+			}()
+			return nil
+		})
+	} else if features.UseCacertsForSelfSignedCA && dubboGenerated {
+		klog.Infof("Use roots from %v and watch", fileBundle.RootCertFile)
+
+		caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+		// Similar code to dubbo-ca-secret: refresh the root cert, but in casecrets
+		s.addStartFunc("dubbod server certificate rotation", func(stop <-chan struct{}) error {
+			go func() {
+				// regenerate dubbod key cert when root cert changes.
+				s.watchRootCertAndGenKeyCert(stop)
+			}()
+			return nil
+		})
+
+	} else {
+		klog.Infof("Use root cert from %v", fileBundle.RootCertFile)
+
+		caBundle, err = os.ReadFile(fileBundle.RootCertFile)
+		if err != nil {
+			return fmt.Errorf("failed reading %s: %v", fileBundle.RootCertFile, err)
+		}
+	}
+	s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+	return nil
+}
+
+func (s *Server) watchRootCertAndGenKeyCert(stop <-chan struct{}) {
+	caBundle := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+	for {
+		if !sleep.Until(stop, rootCertPollingInterval) {
+			return
+		}
+		newRootCert := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+		if !bytes.Equal(caBundle, newRootCert) {
+			caBundle = newRootCert
+			certChain, keyPEM, err := s.CA.GenKeyCert(s.dnsNames, SelfSignedCACertTTL.Get(), false)
+			if err != nil {
+				klog.Errorf("failed generating dubbod key cert %v", err)
+			} else {
+				s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+				klog.Infof("regenerated dubbod dns cert: %s", certChain)
+			}
+		}
+	}
+}
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index 83c9baa..a45e9b1 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -56,6 +56,16 @@
 	HTTPSAddr      string
 	GRPCAddr       string
 	SecureGRPCAddr string
+	TLSOptions     TLSOptions
+}
+
+type TLSOptions struct {
+	// CaCertFile and related are set using CLI flags.
+	CaCertFile      string
+	CertFile        string
+	KeyFile         string
+	TLSCipherSuites []string
+	CipherSuits     []uint16 // This is the parsed cipher suites
 }
 
 func NewSailArgs(initFuncs ...func(*SailArgs)) *SailArgs {
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index f95cc43..89e4994 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -19,6 +19,8 @@
 
 import (
 	"context"
+	"crypto/tls"
+	"crypto/x509"
 	"fmt"
 	"github.com/apache/dubbo-kubernetes/pkg/cluster"
 	"github.com/apache/dubbo-kubernetes/pkg/config/constants"
@@ -29,19 +31,27 @@
 	kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
+	sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
 	"github.com/apache/dubbo-kubernetes/pkg/network"
+	"github.com/apache/dubbo-kubernetes/pkg/spiffe"
+	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+	dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/server"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
 	tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
 	"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
 	"github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
 	"github.com/fsnotify/fsnotify"
+	grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
 	"golang.org/x/net/http2"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/reflection"
 	corev1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/rest"
 	"k8s.io/klog"
@@ -49,9 +59,15 @@
 	"net/http"
 	"os"
 	"strings"
+	"sync"
 	"time"
 )
 
+const (
+	// debounce file watcher events to minimize noise in logs
+	watchDebounceDelay = 100 * time.Millisecond
+)
+
 type Server struct {
 	XDSServer   *xds.DiscoveryServer
 	clusterID   cluster.ID
@@ -84,18 +100,30 @@
 	CA                  *ca.DubboCA
 	dnsNames            []string
 
+	certMu     sync.RWMutex
+	dubbodCert *tls.Certificate
+
 	dubbodCertBundleWatcher *keycertbundle.Watcher
 }
 
 func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) {
 	e := model.NewEnvironment()
+	e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
+
+	ac := aggregate.NewController(aggregate.Options{
+		MeshHolder:      e,
+		ConfigClusterID: getClusterID(args),
+	})
+	e.ServiceDiscovery = ac
+
 	s := &Server{
-		environment:  e,
-		server:       server.New(),
-		clusterID:    getClusterID(args),
-		httpMux:      http.NewServeMux(),
-		fileWatcher:  filewatcher.NewWatcher(),
-		internalStop: make(chan struct{}),
+		environment:             e,
+		server:                  server.New(),
+		clusterID:               getClusterID(args),
+		httpMux:                 http.NewServeMux(),
+		dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
+		fileWatcher:             filewatcher.NewWatcher(),
+		internalStop:            make(chan struct{}),
 	}
 	for _, fn := range initFuncs {
 		fn(s)
@@ -157,6 +185,20 @@
 		return nil, err
 	}
 
+	dubbodHost, _, err := e.GetDiscoveryAddress()
+	if err != nil {
+		return nil, err
+	}
+
+	if err := s.initDubbodCerts(args, string(dubbodHost)); err != nil {
+		return nil, err
+	}
+
+	// Secure gRPC Server must be initialized after CA is created as may use a Aegis generated cert.
+	if err := s.initSecureDiscoveryService(args, s.environment.Mesh().GetTrustDomain()); err != nil {
+		return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
+	}
+
 	return s, nil
 }
 
@@ -298,14 +340,21 @@
 }
 
 func (s *Server) initGrpcServer(options *dubbokeepalive.Options) {
+	interceptors := []grpc.UnaryServerInterceptor{
+		// setup server prometheus monitoring (as final interceptor in chain)
+		grpcprom.UnaryServerInterceptor,
+	}
+	grpcOptions := dubbogrpc.ServerOptions(options, interceptors...)
+	s.grpcServer = grpc.NewServer(grpcOptions...)
+	s.XDSServer.Register(s.grpcServer)
+	reflection.Register(s.grpcServer)
 }
 
-// initControllers initializes the controllers.
 func (s *Server) initControllers(args *SailArgs) error {
 	klog.Info("initializing controllers")
 	// TODO initMulticluster
 
-	// TODO initSDSServer
+	s.initSDSServer()
 
 	if err := s.initConfigController(args); err != nil {
 		return fmt.Errorf("error initializing config controller: %v", err)
@@ -316,6 +365,114 @@
 	return nil
 }
 
+func (s *Server) initSecureDiscoveryService(args *SailArgs, trustDomain string) error {
+	if args.ServerOptions.SecureGRPCAddr == "" {
+		klog.Info("The secure discovery port is disabled, multiplexing on httpAddr ")
+		return nil
+	}
+
+	peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions, trustDomain)
+	if err != nil {
+		return err
+	}
+	if peerCertVerifier == nil {
+		// Running locally without configured certs - no TLS mode
+		klog.Warningf("The secure discovery service is disabled")
+		return nil
+	}
+	klog.Info("initializing secure discovery service")
+
+	cfg := &tls.Config{
+		GetCertificate: s.getDubbodCertificate,
+		ClientAuth:     tls.VerifyClientCertIfGiven,
+		ClientCAs:      peerCertVerifier.GetGeneralCertPool(),
+		VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
+			err := peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
+			if err != nil {
+				klog.Infof("Could not verify certificate: %v", err)
+			}
+			return err
+		},
+		MinVersion:   tls.VersionTLS12,
+		CipherSuites: args.ServerOptions.TLSOptions.CipherSuits,
+	}
+	// Compliance for xDS server TLS.
+	sec_model.EnforceGoCompliance(cfg)
+
+	tlsCreds := credentials.NewTLS(cfg)
+
+	s.secureGrpcAddress = args.ServerOptions.SecureGRPCAddr
+
+	interceptors := []grpc.UnaryServerInterceptor{
+		// setup server prometheus monitoring (as final interceptor in chain)
+		grpcprom.UnaryServerInterceptor,
+	}
+	opts := dubbogrpc.ServerOptions(args.KeepaliveOptions, interceptors...)
+	opts = append(opts, grpc.Creds(tlsCreds))
+
+	s.secureGrpcServer = grpc.NewServer(opts...)
+	s.XDSServer.Register(s.secureGrpcServer)
+	reflection.Register(s.secureGrpcServer)
+
+	s.addStartFunc("secure gRPC", func(stop <-chan struct{}) error {
+		go func() {
+			<-stop
+			s.secureGrpcServer.Stop()
+		}()
+		return nil
+	})
+
+	return nil
+}
+
+func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions, trustDomain string) (*spiffe.PeerCertVerifier, error) {
+	customTLSCertsExists, _, _, caCertPath := hasCustomTLSCerts(tlsOptions)
+	if !customTLSCertsExists && s.CA == nil && !s.isK8SSigning() {
+		// Running locally without configured certs - no TLS mode
+		return nil, nil
+	}
+	peerCertVerifier := spiffe.NewPeerCertVerifier()
+	var rootCertBytes []byte
+	var err error
+	if caCertPath != "" {
+		if rootCertBytes, err = os.ReadFile(caCertPath); err != nil {
+			return nil, err
+		}
+	} else {
+		if s.RA != nil {
+			if strings.HasPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix) {
+				signerName := strings.TrimPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix)
+				caBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
+				rootCertBytes = append(rootCertBytes, caBundle...)
+			} else {
+				rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
+			}
+		}
+		if s.CA != nil {
+			rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
+		}
+	}
+
+	if len(rootCertBytes) != 0 {
+		// TODO: trustDomain here is static and will not update if it dynamically changes in mesh config
+		err := peerCertVerifier.AddMappingFromPEM(trustDomain, rootCertBytes)
+		if err != nil {
+			return nil, fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
+		}
+	}
+
+	return peerCertVerifier, nil
+}
+
+func (s *Server) getDubbodCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
+	s.certMu.RLock()
+	defer s.certMu.RUnlock()
+	if s.dubbodCert != nil {
+		return s.dubbodCert, nil
+	}
+	return nil, fmt.Errorf("cert not initialized")
+}
+
 func (s *Server) serveHTTP() error {
 	// At this point we are ready - start Http Listener so that it can respond to readiness events.
 	httpListener, err := net.Listen("tcp", s.httpServer.Addr)
@@ -376,7 +533,17 @@
 	return clusterID
 }
 
-// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of private keys known by Istiod
+func (s *Server) initSDSServer() {
+	if s.kubeClient == nil {
+		return
+	}
+	if !features.EnableXDSIdentityCheck {
+		// Make sure we have security
+		klog.Warningf("skipping Kubernetes credential reader; SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
+	}
+}
+
+// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of private keys known by Dubbod
 func (s *Server) isK8SSigning() bool {
 	return s.RA != nil && strings.HasPrefix(features.NaviCertProvider, constants.CertProviderKubernetesSignerPrefix)
 }
@@ -443,8 +610,12 @@
 
 func (s *Server) cachesSynced() bool {
 	// TODO multiclusterController HasSynced
-	// TODO ServiceController().HasSynced
-	// TODO configController.HasSynced
+	if !s.ServiceController().HasSynced() {
+		return false
+	}
+	if !s.configController.HasSynced() {
+		return false
+	}
 	return true
 }
 
@@ -468,3 +639,144 @@
 	expected := s.XDSServer.InboundUpdates.Load()
 	return kubelib.WaitForCacheSync("push context", stop, func() bool { return s.pushContextReady(expected) })
 }
+
+func (s *Server) initDubbodCerts(args *SailArgs, host string) error {
+	// Skip all certificates
+	var err error
+
+	s.dnsNames = getDNSNames(args, host)
+	if hasCustomCertArgsOrWellKnown, tlsCertPath, tlsKeyPath, caCertPath := hasCustomTLSCerts(args.ServerOptions.TLSOptions); hasCustomCertArgsOrWellKnown {
+		// Use the DNS certificate provided via args or in well known location.
+		err = s.initFileCertificateWatches(TLSOptions{
+			CaCertFile: caCertPath,
+			KeyFile:    tlsKeyPath,
+			CertFile:   tlsCertPath,
+		})
+		if err != nil {
+			// Not crashing dubbod - This typically happens if certs are missing and in tests.
+			klog.Errorf("error initializing certificate watches: %v", err)
+			return nil
+		}
+	} else if features.EnableCAServer && features.SailCertProvider == constants.CertProviderDubbod {
+		klog.Infof("initializing Dubbod DNS certificates host: %s, custom host: %s", host, features.DubbodServiceCustomHost)
+		err = s.initDNSCertsDubbod()
+	} else if features.SailCertProvider == constants.CertProviderKubernetes {
+		klog.Warningf("SAIL_CERT_PROVIDER=kubernetes is no longer supported by upstream K8S")
+	} else if strings.HasPrefix(features.SailCertProvider, constants.CertProviderKubernetesSignerPrefix) {
+		klog.Infof("initializing Dubbod DNS certificates using K8S RA:%s  host: %s, custom host: %s", features.SailCertProvider,
+			host, features.DubbodServiceCustomHost)
+		err = s.initDNSCertsK8SRA()
+	} else {
+		klog.Warningf("SAIL_CERT_PROVIDER=%s is not implemented", features.SailCertProvider)
+	}
+
+	if err == nil {
+		err = s.initDubbodCertLoader()
+	}
+
+	return err
+}
+
+func getDNSNames(args *SailArgs, host string) []string {
+	// Append custom hostname if there is any
+	customHost := features.DubbodServiceCustomHost
+	var cHosts []string
+
+	if customHost != "" {
+		cHosts = strings.Split(customHost, ",")
+	}
+	sans := sets.New(cHosts...)
+	sans.Insert(host)
+	dnsNames := sets.SortedList(sans)
+	klog.Infof("Discover server subject alt names: %v", dnsNames)
+	return dnsNames
+}
+
+func hasCustomTLSCerts(tlsOptions TLSOptions) (ok bool, tlsCertPath, tlsKeyPath, caCertPath string) {
+	// load from tls args as priority
+	if hasCustomTLSCertArgs(tlsOptions) {
+		return true, tlsOptions.CertFile, tlsOptions.KeyFile, tlsOptions.CaCertFile
+	}
+
+	if ok = checkPathsExist(constants.DefaultSailTLSCert, constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCert); ok {
+		tlsCertPath = constants.DefaultSailTLSCert
+		tlsKeyPath = constants.DefaultSailTLSKey
+		caCertPath = constants.DefaultSailTLSCaCert
+		return
+	}
+
+	if ok = checkPathsExist(constants.DefaultSailTLSCert, constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCertAlternatePath); ok {
+		tlsCertPath = constants.DefaultSailTLSCert
+		tlsKeyPath = constants.DefaultSailTLSKey
+		caCertPath = constants.DefaultSailTLSCaCertAlternatePath
+		return
+	}
+
+	return
+}
+
+func checkPathsExist(paths ...string) bool {
+	for _, path := range paths {
+		fInfo, err := os.Stat(path)
+
+		if err != nil || fInfo.IsDir() {
+			return false
+		}
+	}
+	return true
+}
+
+func hasCustomTLSCertArgs(tlsOptions TLSOptions) bool {
+	return tlsOptions.CaCertFile != "" && tlsOptions.CertFile != "" && tlsOptions.KeyFile != ""
+}
+
+func (s *Server) initDubbodCertLoader() error {
+	if err := s.loadDubbodCert(); err != nil {
+		return fmt.Errorf("first time load DubbodCert failed: %v", err)
+	}
+	_, watchCh := s.dubbodCertBundleWatcher.AddWatcher()
+	s.addStartFunc("reload certs", func(stop <-chan struct{}) error {
+		go s.reloadDubbodCert(watchCh, stop)
+		return nil
+	})
+	return nil
+}
+
+func (s *Server) loadDubbodCert() error {
+	keyCertBundle := s.dubbodCertBundleWatcher.GetKeyCertBundle()
+	keyPair, err := tls.X509KeyPair(keyCertBundle.CertPem, keyCertBundle.KeyPem)
+	if err != nil {
+		return fmt.Errorf("dubbod loading x509 key pairs failed: %v", err)
+	}
+	for _, c := range keyPair.Certificate {
+		x509Cert, err := x509.ParseCertificates(c)
+		if err != nil {
+			// This can rarely happen, just in case.
+			return fmt.Errorf("x509 cert - ParseCertificates() error: %v", err)
+		}
+		for _, c := range x509Cert {
+			klog.Infof("x509 cert - Issuer: %q, Subject: %q, SN: %x, NotBefore: %q, NotAfter: %q",
+				c.Issuer, c.Subject, c.SerialNumber,
+				c.NotBefore.Format(time.RFC3339), c.NotAfter.Format(time.RFC3339))
+		}
+	}
+
+	klog.Info("Dubbod certificates are reloaded")
+	s.certMu.Lock()
+	s.dubbodCert = &keyPair
+	s.certMu.Unlock()
+	return nil
+}
+
+func (s *Server) reloadDubbodCert(watchCh <-chan struct{}, stopCh <-chan struct{}) {
+	for {
+		select {
+		case <-stopCh:
+			return
+		case <-watchCh:
+			if err := s.loadDubbodCert(); err != nil {
+				klog.Errorf("reload dubbod cert failed: %v", err)
+			}
+		}
+	}
+}
diff --git a/sail/pkg/bootstrap/servicecontroller.go b/sail/pkg/bootstrap/servicecontroller.go
index 44e493a..f324fc9 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -16,8 +16,6 @@
 func (s *Server) initServiceControllers(args *SailArgs) error {
 	serviceControllers := s.ServiceController()
 
-	// TODO service entry controller
-
 	registered := sets.New[provider.ID]()
 	for _, r := range args.RegistryOptions.Registries {
 		serviceRegistry := provider.ID(r)
diff --git a/sail/pkg/config/aggregate/config.go b/sail/pkg/config/aggregate/config.go
index 16dfd44..8a8478a 100644
--- a/sail/pkg/config/aggregate/config.go
+++ b/sail/pkg/config/aggregate/config.go
@@ -3,13 +3,13 @@
 
 import (
 	"errors"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 
 	"k8s.io/apimachinery/pkg/types"
 
 	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
 )
 
diff --git a/sail/pkg/config/memory/controller.go b/sail/pkg/config/memory/controller.go
index 06c121d..5119bb6 100644
--- a/sail/pkg/config/memory/controller.go
+++ b/sail/pkg/config/memory/controller.go
@@ -4,7 +4,7 @@
 	"fmt"
 	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
 	"k8s.io/apimachinery/pkg/types"
 )
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index a3fe8e7..08213a1 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -55,4 +55,11 @@
 		"If set to false, Dubbo will not watch for the ca-crl.pem file in the /etc/cacerts directory "+
 			"and will not distribute CRL data to namespaces for proxies to consume.",
 	).Get()
+	SailCertProvider = env.Register("SAIL_CERT_PROVIDER", constants.CertProviderDubbod,
+		"The provider of Pilot DNS certificate. K8S RA will be used for k8s.io/NAME. 'dubbod' value will sign"+
+			" using Dubbo build in CA. Other values will not not generate TLS certs, but still "+
+			" distribute ./etc/certs/root-cert.pem. Only used if custom certificates are not mounted.").Get()
+	DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
+		"Custom host name of dubbod that dubbod signs the server cert. "+
+			"Multiple custom host names are supported, and multiple values are separated by commas.").Get()
 )
diff --git a/sail/pkg/features/security.go b/sail/pkg/features/security.go
index 859f0ff..fb2284e 100644
--- a/sail/pkg/features/security.go
+++ b/sail/pkg/features/security.go
@@ -24,4 +24,9 @@
 	UseCacertsForSelfSignedCA = env.Register("USE_CACERTS_FOR_SELF_SIGNED_CA", false,
 		"If enabled, dubbod will use a secret named cacerts to store its self-signed dubbo-"+
 			"generated root certificate.").Get()
+	EnableXDSIdentityCheck = env.Register(
+		"SAIL_ENABLE_XDS_IDENTITY_CHECK",
+		true,
+		"If enabled, sail will authorize XDS clients, to ensure they are acting only as namespaces they have permissions for.",
+	).Get()
 )
diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go
index ca91a46..a99ba30 100644
--- a/sail/pkg/features/tuning.go
+++ b/sail/pkg/features/tuning.go
@@ -25,4 +25,11 @@
 		100000,
 		"Sets the maximum number of concurrent grpc streams.",
 	).Get()
+
+	// MaxRecvMsgSize The max receive buffer size of gRPC received channel of Pilot in bytes.
+	MaxRecvMsgSize = env.Register(
+		"DUBBO_GPRC_MAXRECVMSGSIZE",
+		4*1024*1024,
+		"Sets the max receive buffer size of gRPC stream in bytes.",
+	).Get()
 )
diff --git a/sail/pkg/grpc/grpc.go b/sail/pkg/grpc/grpc.go
index 0f380b7..3bf677e 100644
--- a/sail/pkg/grpc/grpc.go
+++ b/sail/pkg/grpc/grpc.go
@@ -3,6 +3,8 @@
 import (
 	dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+	middleware "github.com/grpc-ecosystem/go-grpc-middleware"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials/insecure"
@@ -58,6 +60,30 @@
 	return []grpc.DialOption{keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption, msgSizeOption, tlsDialOpts}, nil
 }
 
+func ServerOptions(options *dubbokeepalive.Options, interceptors ...grpc.UnaryServerInterceptor) []grpc.ServerOption {
+	maxStreams := features.MaxConcurrentStreams
+	maxRecvMsgSize := features.MaxRecvMsgSize
+
+	grpcOptions := []grpc.ServerOption{
+		grpc.UnaryInterceptor(middleware.ChainUnaryServer(interceptors...)),
+		grpc.MaxConcurrentStreams(uint32(maxStreams)),
+		grpc.MaxRecvMsgSize(maxRecvMsgSize),
+		// Ensure we allow clients sufficient ability to send keep alives. If this is higher than client
+		// keep alive setting, it will prematurely get a GOAWAY sent.
+		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+			MinTime: options.Time / 2,
+		}),
+		grpc.KeepaliveParams(keepalive.ServerParameters{
+			Time:                  options.Time,
+			Timeout:               options.Timeout,
+			MaxConnectionAge:      options.MaxServerConnectionAge,
+			MaxConnectionAgeGrace: options.MaxServerConnectionAgeGrace,
+		}),
+	}
+
+	return grpcOptions
+}
+
 func GRPCErrorType(err error) ErrorType {
 	if err == io.EOF {
 		return GracefulTermination
diff --git a/sail/pkg/model/addressmap.go b/sail/pkg/model/addressmap.go
new file mode 100644
index 0000000..c8b166f
--- /dev/null
+++ b/sail/pkg/model/addressmap.go
@@ -0,0 +1,75 @@
+package model
+
+import (
+	"github.com/apache/dubbo-kubernetes/pkg/cluster"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
+)
+
+func (m *AddressMap) GetAddresses() map[cluster.ID][]string {
+	if m == nil {
+		return nil
+	}
+
+	m.mutex.RLock()
+	defer m.mutex.RUnlock()
+
+	if m.Addresses == nil {
+		return nil
+	}
+
+	out := make(map[cluster.ID][]string)
+	for k, v := range m.Addresses {
+		out[k] = slices.Clone(v)
+	}
+	return out
+}
+
+func (m *AddressMap) GetAddressesFor(c cluster.ID) []string {
+	if m == nil {
+		return nil
+	}
+
+	m.mutex.RLock()
+	defer m.mutex.RUnlock()
+
+	if m.Addresses == nil {
+		return nil
+	}
+
+	// Copy the Addresses array.
+	return append([]string{}, m.Addresses[c]...)
+}
+
+func (m *AddressMap) SetAddressesFor(c cluster.ID, addresses []string) *AddressMap {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	if len(addresses) == 0 {
+		// Setting an empty array for the cluster. Remove the entry for the cluster if it exists.
+		if m.Addresses != nil {
+			delete(m.Addresses, c)
+
+			// Delete the map if there's nothing left.
+			if len(m.Addresses) == 0 {
+				m.Addresses = nil
+			}
+		}
+	} else {
+		// Create the map if it doesn't already exist.
+		if m.Addresses == nil {
+			m.Addresses = make(map[cluster.ID][]string)
+		}
+		m.Addresses[c] = addresses
+	}
+	return m
+}
+
+func (m *AddressMap) Len() int {
+	if m == nil {
+		return 0
+	}
+	m.mutex.RLock()
+	defer m.mutex.RUnlock()
+
+	return len(m.Addresses)
+}
diff --git a/sail/pkg/model/authentication.go b/sail/pkg/model/authentication.go
new file mode 100644
index 0000000..f1c421a
--- /dev/null
+++ b/sail/pkg/model/authentication.go
@@ -0,0 +1,119 @@
+package model
+
+import (
+	"crypto/md5"
+	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/config"
+	"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+	"istio.io/api/security/v1beta1"
+	"k8s.io/klog/v2"
+	"strings"
+	"time"
+)
+
+type MutualTLSMode int
+
+const (
+	MTLSUnknown MutualTLSMode = iota
+	MTLSDisable
+	MTLSPermissive
+	MTLSStrict
+)
+
+type AuthenticationPolicies struct {
+	requestAuthentications map[string][]config.Config
+	peerAuthentications    map[string][]config.Config
+	globalMutualTLSMode    MutualTLSMode
+	rootNamespace          string
+	namespaceMutualTLSMode map[string]MutualTLSMode
+	aggregateVersion       string
+}
+
+func initAuthenticationPolicies(env *Environment) *AuthenticationPolicies {
+	policy := &AuthenticationPolicies{
+		requestAuthentications: map[string][]config.Config{},
+		peerAuthentications:    map[string][]config.Config{},
+		globalMutualTLSMode:    MTLSUnknown,
+		rootNamespace:          env.Mesh().GetRootNamespace(),
+	}
+
+	policy.addRequestAuthentication(sortConfigByCreationTime(env.List(gvk.RequestAuthentication, NamespaceAll)))
+	policy.addPeerAuthentication(sortConfigByCreationTime(env.List(gvk.PeerAuthentication, NamespaceAll)))
+
+	return policy
+}
+
+func (policy *AuthenticationPolicies) addRequestAuthentication(configs []config.Config) {
+	for _, config := range configs {
+		policy.requestAuthentications[config.Namespace] = append(policy.requestAuthentications[config.Namespace], config)
+	}
+}
+
+func (policy *AuthenticationPolicies) addPeerAuthentication(configs []config.Config) {
+	sortConfigByCreationTime(configs)
+
+	foundNamespaceMTLS := make(map[string]v1beta1.PeerAuthentication_MutualTLS_Mode)
+	seenNamespaceOrMeshConfig := make(map[string]time.Time)
+	versions := []string{}
+
+	for _, config := range configs {
+		versions = append(versions, config.UID+"."+config.ResourceVersion)
+		spec := config.Spec.(*v1beta1.PeerAuthentication)
+		if spec.Selector == nil || len(spec.Selector.MatchLabels) == 0 {
+			if t, ok := seenNamespaceOrMeshConfig[config.Namespace]; ok {
+				klog.Warningf(
+					"Namespace/mesh-level PeerAuthentication is already defined for %q at time %v. Ignore %q which was created at time %v",
+					config.Namespace, t, config.Name, config.CreationTimestamp)
+				continue
+			}
+			seenNamespaceOrMeshConfig[config.Namespace] = config.CreationTimestamp
+
+			mode := v1beta1.PeerAuthentication_MutualTLS_UNSET
+			if spec.Mtls != nil {
+				mode = spec.Mtls.Mode
+			}
+			if config.Namespace == policy.rootNamespace {
+				if mode == v1beta1.PeerAuthentication_MutualTLS_UNSET {
+					policy.globalMutualTLSMode = MTLSPermissive
+				} else {
+					policy.globalMutualTLSMode = ConvertToMutualTLSMode(mode)
+				}
+			} else {
+				foundNamespaceMTLS[config.Namespace] = mode
+			}
+		}
+
+		policy.peerAuthentications[config.Namespace] = append(policy.peerAuthentications[config.Namespace], config)
+	}
+
+	// nolint: gosec
+	// Not security sensitive code
+	policy.aggregateVersion = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(versions, ";"))))
+
+	policy.namespaceMutualTLSMode = make(map[string]MutualTLSMode, len(foundNamespaceMTLS))
+
+	inheritedMTLSMode := policy.globalMutualTLSMode
+	if inheritedMTLSMode == MTLSUnknown {
+		inheritedMTLSMode = MTLSPermissive
+	}
+	for ns, mtlsMode := range foundNamespaceMTLS {
+		if mtlsMode == v1beta1.PeerAuthentication_MutualTLS_UNSET {
+			policy.namespaceMutualTLSMode[ns] = inheritedMTLSMode
+		} else {
+			policy.namespaceMutualTLSMode[ns] = ConvertToMutualTLSMode(mtlsMode)
+		}
+	}
+}
+
+func ConvertToMutualTLSMode(mode v1beta1.PeerAuthentication_MutualTLS_Mode) MutualTLSMode {
+	switch mode {
+	case v1beta1.PeerAuthentication_MutualTLS_DISABLE:
+		return MTLSDisable
+	case v1beta1.PeerAuthentication_MutualTLS_PERMISSIVE:
+		return MTLSPermissive
+	case v1beta1.PeerAuthentication_MutualTLS_STRICT:
+		return MTLSStrict
+	default:
+		return MTLSUnknown
+	}
+}
diff --git a/sail/pkg/model/authorization.go b/sail/pkg/model/authorization.go
new file mode 100644
index 0000000..9789ac3
--- /dev/null
+++ b/sail/pkg/model/authorization.go
@@ -0,0 +1,48 @@
+package model
+
+import (
+	"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+	authpb "istio.io/api/security/v1beta1"
+)
+
+type AuthorizationPolicy struct {
+	Name        string                      `json:"name"`
+	Namespace   string                      `json:"namespace"`
+	Annotations map[string]string           `json:"annotations"`
+	Spec        *authpb.AuthorizationPolicy `json:"spec"`
+}
+
+type AuthorizationPolicies struct {
+	NamespaceToPolicies map[string][]AuthorizationPolicy `json:"namespace_to_policies"`
+	RootNamespace       string                           `json:"root_namespace"`
+}
+
+func GetAuthorizationPolicies(env *Environment) *AuthorizationPolicies {
+	policy := &AuthorizationPolicies{
+		NamespaceToPolicies: map[string][]AuthorizationPolicy{},
+		RootNamespace:       env.Mesh().GetRootNamespace(),
+	}
+
+	policies := env.List(gvk.AuthorizationPolicy, NamespaceAll)
+	sortConfigByCreationTime(policies)
+
+	policyCount := make(map[string]int)
+	for _, config := range policies {
+		policyCount[config.Namespace]++
+	}
+
+	for _, config := range policies {
+		authzConfig := AuthorizationPolicy{
+			Name:        config.Name,
+			Namespace:   config.Namespace,
+			Annotations: config.Annotations,
+			Spec:        config.Spec.(*authpb.AuthorizationPolicy),
+		}
+		if _, ok := policy.NamespaceToPolicies[config.Namespace]; !ok {
+			policy.NamespaceToPolicies[config.Namespace] = make([]AuthorizationPolicy, 0, policyCount[config.Namespace])
+		}
+		policy.NamespaceToPolicies[config.Namespace] = append(policy.NamespaceToPolicies[config.Namespace], authzConfig)
+	}
+
+	return policy
+}
diff --git a/sail/pkg/model/cluster_local.go b/sail/pkg/model/cluster_local.go
new file mode 100644
index 0000000..9bb6b42
--- /dev/null
+++ b/sail/pkg/model/cluster_local.go
@@ -0,0 +1,14 @@
+package model
+
+import "github.com/apache/dubbo-kubernetes/pkg/config/host"
+
+type ClusterLocalHosts struct {
+	specific map[host.Name]bool
+	wildcard map[host.Name]bool
+}
+
+type ClusterLocalProvider interface {
+	// GetClusterLocalHosts returns the list of cluster-local hosts, sorted in
+	// ascending order. The caller must not modify the returned list.
+	GetClusterLocalHosts() ClusterLocalHosts
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 78e5b91..6619a5d 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -1,8 +1,14 @@
 package model
 
 import (
+	"cmp"
 	"github.com/apache/dubbo-kubernetes/pkg/config"
 	"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+	"sort"
+)
+
+const (
+	NamespaceAll = ""
 )
 
 type ConfigStore interface {
@@ -24,3 +30,16 @@
 	Run(stop <-chan struct{})
 	HasSynced() bool
 }
+
+func sortConfigByCreationTime(configs []config.Config) []config.Config {
+	sort.Slice(configs, func(i, j int) bool {
+		if r := configs[i].CreationTimestamp.Compare(configs[j].CreationTimestamp); r != 0 {
+			return r == -1 // -1 means i is less than j, so return true
+		}
+		if r := cmp.Compare(configs[i].Name, configs[j].Name); r != 0 {
+			return r == -1
+		}
+		return cmp.Compare(configs[i].Namespace, configs[j].Namespace) == -1
+	})
+	return configs
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 49705d6..e6a2f4f 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -22,6 +22,7 @@
 	"github.com/apache/dubbo-kubernetes/pkg/config/host"
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+	"github.com/apache/dubbo-kubernetes/pkg/xds"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/features"
 	meshconfig "istio.io/api/mesh/v1alpha1"
 	"net"
@@ -31,14 +32,19 @@
 
 type Watcher = meshwatcher.WatcherCollection
 
+type WatchedResource = xds.WatchedResource
+
 type Environment struct {
 	ServiceDiscovery
 	Watcher
 	ConfigStore
-	mutex           sync.RWMutex
-	pushContext     *PushContext
-	Cache           XdsCache
-	NetworksWatcher mesh.NetworksWatcher
+	mutex                sync.RWMutex
+	pushContext          *PushContext
+	Cache                XdsCache
+	NetworksWatcher      mesh.NetworksWatcher
+	NetworkManager       *NetworkManager
+	clusterLocalServices ClusterLocalProvider
+	DomainSuffix         string
 }
 
 type XdsCacheImpl struct {
@@ -56,24 +62,25 @@
 		cache = DisabledCache{}
 	}
 	return &Environment{
-		Cache: cache,
+		pushContext: NewPushContext(),
+		Cache:       cache,
 	}
 }
 
 var _ mesh.Holder = &Environment{}
 
-func (e *Environment) SetPushContext(pc *PushContext) {
-	e.mutex.Lock()
-	defer e.mutex.Unlock()
-	e.pushContext = pc
-}
-
 func (e *Environment) PushContext() *PushContext {
 	e.mutex.RLock()
 	defer e.mutex.RUnlock()
 	return e.pushContext
 }
 
+func (e *Environment) SetPushContext(pc *PushContext) {
+	e.mutex.Lock()
+	defer e.mutex.Unlock()
+	e.pushContext = pc
+}
+
 func (e *Environment) Mesh() *meshconfig.MeshConfig {
 	if e != nil && e.Watcher != nil {
 		return e.Watcher.Mesh()
@@ -109,4 +116,8 @@
 	return host.Name(hostname), port, nil
 }
 
+func (e *Environment) ClusterLocal() ClusterLocalProvider {
+	return e.clusterLocalServices
+}
+
 type Proxy struct{}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
index a79c32f..72e2dd1 100644
--- a/sail/pkg/model/controller.go
+++ b/sail/pkg/model/controller.go
@@ -1,8 +1,12 @@
 package model
 
 type Controller interface {
-	// Run until a signal is received
 	Run(stop <-chan struct{})
+	HasSynced() bool
+}
+
+type AggregateController interface {
+	Controller
 }
 
 type Event int
diff --git a/sail/pkg/model/network.go b/sail/pkg/model/network.go
new file mode 100644
index 0000000..66f7ce2
--- /dev/null
+++ b/sail/pkg/model/network.go
@@ -0,0 +1,4 @@
+package model
+
+type NetworkManager struct {
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 76870e3..4e94184 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -17,10 +17,78 @@
 
 package model
 
-import meshconfig "istio.io/api/mesh/v1alpha1"
+import (
+	"github.com/apache/dubbo-kubernetes/pkg/config"
+	"github.com/apache/dubbo-kubernetes/pkg/config/host"
+	"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+	"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+	"go.uber.org/atomic"
+	meshconfig "istio.io/api/mesh/v1alpha1"
+	"k8s.io/apimachinery/pkg/types"
+	"sync"
+)
 
 type PushContext struct {
-	Mesh *meshconfig.MeshConfig `json:"-"`
+	Mesh                 *meshconfig.MeshConfig `json:"-"`
+	initializeMutex      sync.Mutex
+	InitDone             atomic.Bool
+	Networks             *meshconfig.MeshNetworks
+	networkMgr           *NetworkManager
+	clusterLocalHosts    ClusterLocalHosts
+	exportToDefaults     exportToDefaults
+	AuthnPolicies        *AuthenticationPolicies `json:"-"`
+	AuthzPolicies        *AuthorizationPolicies  `json:"-"`
+	virtualServiceIndex  virtualServiceIndex
+	destinationRuleIndex destinationRuleIndex
+	ServiceIndex         serviceIndex
+	serviceAccounts      map[serviceAccountKey][]string
+}
+
+type serviceAccountKey struct {
+	hostname  host.Name
+	namespace string
+}
+
+type virtualServiceIndex struct {
+	exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
+	// this contains all the virtual services with exportTo "." and current namespace. The keys are namespace,gateway.
+	privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
+	// This contains all virtual services whose exportTo is "*", keyed by gateway
+	publicByGateway map[string][]config.Config
+	// root vs namespace/name ->delegate vs virtualservice gvk/namespace/name
+	delegates map[ConfigKey][]ConfigKey
+
+	// This contains destination hosts of virtual services, keyed by gateway's namespace/name,
+	// only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
+	destinationsByGateway map[string]sets.String
+
+	// Map of VS hostname -> referenced hostnames
+	referencedDestinations map[string]sets.String
+}
+
+type destinationRuleIndex struct {
+	namespaceLocal      map[string]*consolidatedDestRules
+	exportedByNamespace map[string]*consolidatedDestRules
+	rootNamespaceLocal  *consolidatedDestRules
+}
+
+type consolidatedDestRules struct {
+	specificDestRules map[host.Name][]*ConsolidatedDestRule
+	wildcardDestRules map[host.Name][]*ConsolidatedDestRule
+}
+
+type serviceIndex struct {
+	privateByNamespace   map[string][]*Service
+	public               []*Service
+	exportedToNamespace  map[string][]*Service
+	HostnameAndNamespace map[host.Name]map[string]*Service `json:"-"`
+}
+
+type ConsolidatedDestRule struct {
+	exportTo sets.Set[visibility.Instance]
+	rule     *config.Config
+	from     []types.NamespacedName
 }
 
 type TriggerReason string
@@ -28,13 +96,30 @@
 type ReasonStats map[TriggerReason]int
 
 type PushRequest struct {
-	Reason ReasonStats
+	Reason          ReasonStats
+	ConfigsUpdated  sets.Set[ConfigKey]
+	Forced          bool
+	Full            bool
+	Push            *PushContext
+	LastPushContext *PushContext
 }
 
 func NewPushContext() *PushContext {
 	return &PushContext{}
 }
 
+type ConfigKey struct {
+	Kind      kind.Kind
+	Name      string
+	Namespace string
+}
+
+type exportToDefaults struct {
+	service         sets.Set[visibility.Instance]
+	virtualService  sets.Set[visibility.Instance]
+	destinationRule sets.Set[visibility.Instance]
+}
+
 func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest {
 	if pr == nil {
 		return other
@@ -48,4 +133,141 @@
 }
 
 type XDSUpdater interface {
-}
\ No newline at end of file
+}
+
+func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) {
+	ps.initializeMutex.Lock()
+	defer ps.initializeMutex.Unlock()
+	if ps.InitDone.Load() {
+		return
+	}
+
+	ps.Mesh = env.Mesh()
+	ps.Networks = env.MeshNetworks()
+
+	ps.initDefaultExportMaps()
+
+	if pushReq == nil || oldPushContext == nil || !oldPushContext.InitDone.Load() || pushReq.Forced {
+		ps.createNewContext(env)
+	} else {
+		ps.updateContext(env, oldPushContext, pushReq)
+	}
+
+	ps.networkMgr = env.NetworkManager
+
+	ps.clusterLocalHosts = env.ClusterLocal().GetClusterLocalHosts()
+
+	ps.InitDone.Store(true)
+}
+
+func (ps *PushContext) initDefaultExportMaps() {
+	ps.exportToDefaults.destinationRule = sets.New[visibility.Instance]()
+	if ps.Mesh.DefaultDestinationRuleExportTo != nil {
+		for _, e := range ps.Mesh.DefaultDestinationRuleExportTo {
+			ps.exportToDefaults.destinationRule.Insert(visibility.Instance(e))
+		}
+	} else {
+		// default to *
+		ps.exportToDefaults.destinationRule.Insert(visibility.Public)
+	}
+
+	ps.exportToDefaults.service = sets.New[visibility.Instance]()
+	if ps.Mesh.DefaultServiceExportTo != nil {
+		for _, e := range ps.Mesh.DefaultServiceExportTo {
+			ps.exportToDefaults.service.Insert(visibility.Instance(e))
+		}
+	} else {
+		ps.exportToDefaults.service.Insert(visibility.Public)
+	}
+
+	ps.exportToDefaults.virtualService = sets.New[visibility.Instance]()
+	if ps.Mesh.DefaultVirtualServiceExportTo != nil {
+		for _, e := range ps.Mesh.DefaultVirtualServiceExportTo {
+			ps.exportToDefaults.virtualService.Insert(visibility.Instance(e))
+		}
+	} else {
+		ps.exportToDefaults.virtualService.Insert(visibility.Public)
+	}
+}
+
+func (ps *PushContext) initServiceRegistry(env *Environment, configsUpdate sets.Set[ConfigKey]) {
+
+}
+
+func (ps *PushContext) initServiceAccounts(env *Environment, services []*Service) {
+}
+
+func (ps *PushContext) initVirtualServices(env *Environment) {
+}
+
+func (ps *PushContext) initDestinationRules(env *Environment) {
+}
+
+func (ps *PushContext) initAuthnPolicies(env *Environment) {
+	ps.AuthnPolicies = initAuthenticationPolicies(env)
+}
+
+func (ps *PushContext) initAuthorizationPolicies(env *Environment) {
+	ps.AuthzPolicies = GetAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) createNewContext(env *Environment) {
+	ps.initServiceRegistry(env, nil)
+	ps.initVirtualServices(env)
+	ps.initDestinationRules(env)
+	ps.initAuthnPolicies(env)
+	ps.initAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) updateContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) {
+	var servicesChanged, virtualServicesChanged, destinationRulesChanged, authnChanged, authzChanged bool
+
+	// We do not need to watch Ingress or Gateway API changes. Both of these have their own controllers which will send
+	// events for Istio types (Gateway and VirtualService).
+	for conf := range pushReq.ConfigsUpdated {
+		switch conf.Kind {
+		case kind.DestinationRule:
+			destinationRulesChanged = true
+		case kind.VirtualService:
+			virtualServicesChanged = true
+		case kind.AuthorizationPolicy:
+			authzChanged = true
+		case kind.RequestAuthentication,
+			kind.PeerAuthentication:
+			authnChanged = true
+		}
+	}
+
+	if servicesChanged {
+		// Services have changed. initialize service registry
+		ps.initServiceRegistry(env, pushReq.ConfigsUpdated)
+	} else {
+		// make sure we copy over things that would be generated in initServiceRegistry
+		ps.ServiceIndex = oldPushContext.ServiceIndex
+		ps.serviceAccounts = oldPushContext.serviceAccounts
+	}
+
+	if virtualServicesChanged {
+		ps.initVirtualServices(env)
+	} else {
+		ps.virtualServiceIndex = oldPushContext.virtualServiceIndex
+	}
+
+	if destinationRulesChanged {
+		ps.initDestinationRules(env)
+	} else {
+		ps.destinationRuleIndex = oldPushContext.destinationRuleIndex
+	}
+
+	if authnChanged {
+		ps.initAuthnPolicies(env)
+	} else {
+		ps.AuthnPolicies = oldPushContext.AuthnPolicies
+	}
+
+	if authzChanged {
+		ps.initAuthorizationPolicies(env)
+	} else {
+		ps.AuthzPolicies = oldPushContext.AuthzPolicies
+	}
+}
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 6573e2f..7b6c26d 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -1,4 +1,148 @@
 package model
 
+import (
+	"github.com/apache/dubbo-kubernetes/pkg/cluster"
+	"github.com/apache/dubbo-kubernetes/pkg/config/host"
+	"github.com/apache/dubbo-kubernetes/pkg/config/protocol"
+	"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+	"github.com/apache/dubbo-kubernetes/pkg/maps"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+	"sync"
+	"time"
+)
+
+type NamespacedHostname struct {
+	Hostname  host.Name
+	Namespace string
+}
+
+type ServiceAttributes struct {
+	Labels                   map[string]string
+	LabelSelectors           map[string]string
+	ExportTo                 sets.Set[visibility.Instance]
+	ClusterExternalAddresses *AddressMap
+	ClusterExternalPorts     map[cluster.ID]map[uint32]uint32
+	Aliases                  []NamespacedHostname
+	PassthroughTargetPorts   map[uint32]uint32
+	// Name is "destination.service.name" attribute
+	Name string
+	// Namespace is "destination.service.namespace" attribute
+	Namespace       string
+	ServiceRegistry provider.ID
+}
+
+type AddressMap struct {
+	Addresses map[cluster.ID][]string
+
+	// NOTE: The copystructure library is not able to copy unexported fields, so the mutex will not be copied.
+	mutex sync.RWMutex
+}
+
+func (m *AddressMap) DeepCopy() *AddressMap {
+	if m == nil {
+		return nil
+	}
+	return &AddressMap{
+		Addresses: m.GetAddresses(),
+	}
+}
+
+type Service struct {
+	Attributes      ServiceAttributes
+	Hostname        host.Name  `json:"hostname"`
+	Ports           PortList   `json:"ports,omitempty"`
+	ServiceAccounts []string   `json:"serviceAccounts,omitempty"`
+	ClusterVIPs     AddressMap `json:"clusterVIPs,omitempty"`
+	CreationTime    time.Time  `json:"creationTime,omitempty"`
+}
+
+func (s *Service) DeepCopy() *Service {
+	// nolint: govet
+	out := *s
+	out.Attributes = s.Attributes.DeepCopy()
+	if s.Ports != nil {
+		out.Ports = make(PortList, len(s.Ports))
+		for i, port := range s.Ports {
+			if port != nil {
+				out.Ports[i] = &Port{
+					Name:     port.Name,
+					Port:     port.Port,
+					Protocol: port.Protocol,
+				}
+			} else {
+				out.Ports[i] = nil
+			}
+		}
+	}
+
+	out.ServiceAccounts = slices.Clone(s.ServiceAccounts)
+	out.ClusterVIPs = *s.ClusterVIPs.DeepCopy()
+	return &out
+}
+
+func (s *Service) Key() string {
+	if s == nil {
+		return ""
+	}
+
+	return s.Attributes.Namespace + "/" + string(s.Hostname)
+}
+
+type Port struct {
+	Name     string            `json:"name,omitempty"`
+	Port     int               `json:"port"`
+	Protocol protocol.Instance `json:"protocol,omitempty"`
+}
+
+type PortList []*Port
+
+func (p *Port) Equals(other *Port) bool {
+	if p == nil {
+		return other == nil
+	}
+	if other == nil {
+		return p == nil
+	}
+	return p.Name == other.Name && p.Port == other.Port && p.Protocol == other.Protocol
+}
+
+func (ports PortList) Equals(other PortList) bool {
+	return slices.EqualFunc(ports, other, func(a, b *Port) bool {
+		return a.Equals(b)
+	})
+}
+
 type ServiceDiscovery interface {
+	Services() []*Service
+	GetService(hostname host.Name) *Service
+}
+
+func (s *ServiceAttributes) DeepCopy() ServiceAttributes {
+	// AddressMap contains a mutex, which is safe to copy in this case.
+	// nolint: govet
+	out := *s
+
+	out.Labels = maps.Clone(s.Labels)
+	if s.ExportTo != nil {
+		out.ExportTo = s.ExportTo.Copy()
+	}
+
+	out.LabelSelectors = maps.Clone(s.LabelSelectors)
+	out.ClusterExternalAddresses = s.ClusterExternalAddresses.DeepCopy()
+
+	if s.ClusterExternalPorts != nil {
+		out.ClusterExternalPorts = make(map[cluster.ID]map[uint32]uint32, len(s.ClusterExternalPorts))
+		for k, m := range s.ClusterExternalPorts {
+			out.ClusterExternalPorts[k] = maps.Clone(m)
+		}
+	}
+
+	out.Aliases = slices.Clone(s.Aliases)
+	out.PassthroughTargetPorts = maps.Clone(out.PassthroughTargetPorts)
+
+	// AddressMap contains a mutex, which is safe to return a copy in this case.
+	// nolint: govet
+	return out
 }
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go b/sail/pkg/serviceregistry/aggregate/controller.go
index 0b22075..2179a5b 100644
--- a/sail/pkg/serviceregistry/aggregate/controller.go
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -2,12 +2,20 @@
 
 import (
 	"github.com/apache/dubbo-kubernetes/pkg/cluster"
+	"github.com/apache/dubbo-kubernetes/pkg/config/host"
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
 	"k8s.io/klog/v2"
 	"sync"
 )
 
+var (
+	_ model.ServiceDiscovery    = &Controller{}
+	_ model.AggregateController = &Controller{}
+)
+
 type Controller struct {
 	registries      []*registryEntry
 	storeLock       sync.RWMutex
@@ -53,3 +61,97 @@
 	<-stop
 	klog.Info("Registry Aggregator terminated")
 }
+
+func (c *Controller) HasSynced() bool {
+	for _, r := range c.GetRegistries() {
+		if !r.HasSynced() {
+			klog.V(2).Infof("registry %s is syncing", r.Cluster())
+			return false
+		}
+	}
+	return true
+}
+
+func (c *Controller) Services() []*model.Service {
+	// smap is a map of hostname (string) to service index, used to identify services that
+	// are installed in multiple clusters.
+	smap := make(map[host.Name]int)
+	index := 0
+	services := make([]*model.Service, 0)
+	// Locking Registries list while walking it to prevent inconsistent results
+	for _, r := range c.GetRegistries() {
+		svcs := r.Services()
+		if r.Provider() != provider.Kubernetes {
+			index += len(svcs)
+			services = append(services, svcs...)
+		} else {
+			for _, s := range svcs {
+				previous, ok := smap[s.Hostname]
+				if !ok {
+					// First time we see a service. The result will have a single service per hostname
+					// The first cluster will be listed first, so the services in the primary cluster
+					// will be used for default settings. If a service appears in multiple clusters,
+					// the order is less clear.
+					smap[s.Hostname] = index
+					index++
+					services = append(services, s)
+				} else {
+					// We must deepcopy before merge, and after merging, the ClusterVips length will be >= 2.
+					// This is an optimization to prevent deepcopy multi-times
+					if services[previous].ClusterVIPs.Len() < 2 {
+						// Deep copy before merging, otherwise there is a case
+						// a service in remote cluster can be deleted, but the ClusterIP left.
+						services[previous] = services[previous].DeepCopy()
+					}
+					// If it is seen second time, that means it is from a different cluster, update cluster VIPs.
+					mergeService(services[previous], s, r)
+				}
+			}
+		}
+	}
+	return services
+}
+
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+	var out *model.Service
+	for _, r := range c.GetRegistries() {
+		service := r.GetService(hostname)
+		if service == nil {
+			continue
+		}
+		if r.Provider() != provider.Kubernetes {
+			return service
+		}
+		if out == nil {
+			out = service.DeepCopy()
+		} else {
+			// If we are seeing the service for the second time, it means it is available in multiple clusters.
+			mergeService(out, service, r)
+		}
+	}
+	return out
+}
+
+func (c *Controller) GetRegistries() []serviceregistry.Instance {
+	c.storeLock.RLock()
+	defer c.storeLock.RUnlock()
+
+	// copy registries to prevent race, no need to deep copy here.
+	out := make([]serviceregistry.Instance, len(c.registries))
+	for i := range c.registries {
+		out[i] = c.registries[i]
+	}
+	return out
+}
+
+func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance) {
+	if !src.Ports.Equals(dst.Ports) {
+		klog.V(2).Infof("service %s defined from cluster %s is different from others", src.Hostname, srcRegistry.Cluster())
+	}
+	// Prefer the k8s HostVIPs where possible
+	clusterID := srcRegistry.Cluster()
+	if len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 {
+		newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
+		dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
+	}
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go b/sail/pkg/serviceregistry/kube/controller/controller.go
index dac2cf3..893ec60 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -19,26 +19,104 @@
 
 import (
 	"github.com/apache/dubbo-kubernetes/pkg/cluster"
+	"github.com/apache/dubbo-kubernetes/pkg/config/host"
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+	kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+	"github.com/apache/dubbo-kubernetes/pkg/queue"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+	"go.uber.org/atomic"
+	"k8s.io/klog/v2"
+	"sort"
+	"sync"
+	"time"
 )
 
+var (
+	_ serviceregistry.Instance = &Controller{}
+)
+
+type Controller struct {
+	opts   Options
+	client kubelib.Client
+	sync.RWMutex
+	servicesMap         map[host.Name]*model.Service
+	queue               queue.Instance
+	initialSyncTimedout *atomic.Bool
+}
+
 type Options struct {
-	KubernetesAPIQPS   float32
-	KubernetesAPIBurst int
-	DomainSuffix       string
-	// XDSUpdater will push changes to the xDS server.
-	XDSUpdater model.XDSUpdater
-	// MeshNetworksWatcher observes changes to the mesh networks config.
-	MeshNetworksWatcher mesh.NetworksWatcher
-	// MeshWatcher observes changes to the mesh config
+	KubernetesAPIQPS      float32
+	KubernetesAPIBurst    int
+	DomainSuffix          string
+	XDSUpdater            model.XDSUpdater
+	MeshNetworksWatcher   mesh.NetworksWatcher
 	MeshWatcher           meshwatcher.WatcherCollection
 	ClusterID             cluster.ID
 	ClusterAliases        map[string]string
 	SystemNamespace       string
 	MeshServiceController *aggregate.Controller
 	KrtDebugger           *krt.DebugHandler
+	SyncTimeout           time.Duration
+}
+
+func (c *Controller) Services() []*model.Service {
+	c.RLock()
+	out := make([]*model.Service, 0, len(c.servicesMap))
+	for _, svc := range c.servicesMap {
+		out = append(out, svc)
+	}
+	c.RUnlock()
+	sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
+	return out
+}
+
+// GetService implements a service catalog operation by hostname specified.
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+	c.RLock()
+	svc := c.servicesMap[hostname]
+	c.RUnlock()
+	return svc
+}
+
+func (c *Controller) Provider() provider.ID {
+	return provider.Kubernetes
+}
+
+func (c *Controller) Cluster() cluster.ID {
+	return c.opts.ClusterID
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+	if c.opts.SyncTimeout != 0 {
+		time.AfterFunc(c.opts.SyncTimeout, func() {
+			if !c.queue.HasSynced() {
+				klog.Warningf("kube controller for %s initial sync timed out", c.opts.ClusterID)
+				c.initialSyncTimedout.Store(true)
+			}
+		})
+	}
+	st := time.Now()
+
+	kubelib.WaitForCacheSync("kube controller", stop, c.informersSynced)
+	klog.Infof("kube controller for %s synced after %v", c.opts.ClusterID, time.Since(st))
+
+	// after the in-order sync we can start processing the queue
+	c.queue.Run(stop)
+	klog.Infof("Controller terminated")
+}
+
+func (c *Controller) HasSynced() bool {
+	if c.initialSyncTimedout.Load() {
+		return true
+	}
+	return c.queue.HasSynced()
+}
+
+func (c *Controller) informersSynced() bool {
+	return false
 }
diff --git a/sail/pkg/trustbundle/trustbundle.go b/sail/pkg/trustbundle/trustbundle.go
index 202e901..33964cf 100644
--- a/sail/pkg/trustbundle/trustbundle.go
+++ b/sail/pkg/trustbundle/trustbundle.go
@@ -21,6 +21,7 @@
 	"crypto/x509"
 	"encoding/pem"
 	"fmt"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"k8s.io/klog/v2"
 	"sort"
 	"strings"
@@ -30,7 +31,6 @@
 	"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
 	"github.com/apache/dubbo-kubernetes/pkg/spiffe"
 	"github.com/apache/dubbo-kubernetes/pkg/util/sets"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
 	meshconfig "istio.io/api/mesh/v1alpha1"
 )
 
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 5a71bd0..5591c2e 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -18,13 +18,19 @@
 package xds
 
 import (
+	"context"
 	"github.com/apache/dubbo-kubernetes/pkg/xds"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
 	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
 	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+	"time"
 )
 
-type DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+type (
+	DiscoveryStream      = xds.DiscoveryStream
+	DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+	DeltaDiscoveryClient = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
+)
 
 type Connection struct {
 	xds.Connection
@@ -36,6 +42,11 @@
 	ids          []string
 }
 
+type Event struct {
+	pushRequest *model.PushRequest
+	done        func()
+}
+
 func (conn *Connection) XdsConnection() *xds.Connection {
 	return &conn.Connection
 }
@@ -43,3 +54,51 @@
 func (conn *Connection) Proxy() *model.Proxy {
 	return conn.proxy
 }
+
+func (s *DiscoveryServer) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
+	return s.StreamDeltas(stream)
+}
+
+func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {
+	return s.Stream(stream)
+}
+
+func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, error) {
+	return nil, nil
+}
+
+func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, identities []string) error {
+	return nil
+}
+
+func (s *DiscoveryServer) closeConnection(con *Connection) {
+	if con.ID() == "" {
+		return
+	}
+}
+
+func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
+	return nil
+}
+
+func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
+	if s.RequestRateLimit.Limit() == 0 {
+		// Allow opt out when rate limiting is set to 0qps
+		return nil
+	}
+	// Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another
+	// instance in best case, or retry with backoff.
+	wait, cancel := context.WithTimeout(ctx, time.Second)
+	defer cancel()
+	return s.RequestRateLimit.Wait(wait)
+}
+
+func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
+	return &Connection{
+		Connection: xds.NewConnection(peerAddr, stream),
+	}
+}
+
+func (conn *Connection) watchedResourcesByOrder() []*model.WatchedResource {
+	return nil
+}
diff --git a/sail/pkg/xds/auth.go b/sail/pkg/xds/auth.go
new file mode 100644
index 0000000..02780d1
--- /dev/null
+++ b/sail/pkg/xds/auth.go
@@ -0,0 +1,9 @@
+package xds
+
+import (
+	"context"
+)
+
+func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
+	return nil, nil
+}
diff --git a/sail/pkg/xds/delta.go b/sail/pkg/xds/delta.go
new file mode 100644
index 0000000..82064a6
--- /dev/null
+++ b/sail/pkg/xds/delta.go
@@ -0,0 +1,198 @@
+package xds
+
+import (
+	"github.com/apache/dubbo-kubernetes/pkg/xds"
+	dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+	v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/peer"
+	"google.golang.org/grpc/status"
+	"k8s.io/klog/v2"
+)
+
+func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
+	ctx := stream.Context()
+	peerAddr := "0.0.0.0"
+	if peerInfo, ok := peer.FromContext(ctx); ok {
+		peerAddr = peerInfo.Addr.String()
+	}
+
+	if err := s.WaitForRequestLimit(stream.Context()); err != nil {
+		klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
+		return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
+	}
+
+	ids, err := s.authenticate(ctx)
+	if err != nil {
+		return status.Error(codes.Unauthenticated, err.Error())
+	}
+	if ids != nil {
+		klog.V(2).Infof("Authenticated XDS: %v with identity %v", peerAddr, ids)
+	} else {
+		klog.V(2).Infof("Unauthenticated XDS: %v", peerAddr)
+	}
+
+	// InitContext returns immediately if the context was already initialized.
+	s.globalPushContext().InitContext(s.Env, nil, nil)
+	con := newDeltaConnection(peerAddr, stream)
+
+	// Do not call: defer close(con.pushChannel). The push channel will be garbage collected
+	// when the connection is no longer used. Closing the channel can cause subtle race conditions
+	// with push. According to the spec: "It's only necessary to close a channel when it is important
+	// to tell the receiving goroutines that all data have been sent."
+
+	// Block until either a request is received or a push is triggered.
+	// We need 2 go routines because 'read' blocks in Recv().
+	go s.receiveDelta(con, ids)
+
+	// Wait for the proxy to be fully initialized before we start serving traffic. Because
+	// initialization doesn't have dependencies that will block, there is no need to add any timeout
+	// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
+	// reqChannel and the connection not being enqueued for pushes to pushChannel until the
+	// initialization is complete.
+	<-con.InitializedCh()
+
+	for {
+		// Go select{} statements are not ordered; the same channel can be chosen many times.
+		// For requests, these are higher priority (client may be blocked on startup until these are done)
+		// and often very cheap to handle (simple ACK), so we check it first.
+		select {
+		case req, ok := <-con.deltaReqChan:
+			if ok {
+				if err := s.processDeltaRequest(req, con); err != nil {
+					return err
+				}
+			} else {
+				// Remote side closed connection or error processing the request.
+				return <-con.ErrorCh()
+			}
+		case <-con.StopCh():
+			return nil
+		default:
+		}
+		// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge
+		// amount of incoming requests, we may still send some pushes, as we do not `continue` above;
+		// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests
+		// cannot completely starve pushes. However, this scenario is unlikely.
+		select {
+		case req, ok := <-con.deltaReqChan:
+			if ok {
+				if err := s.processDeltaRequest(req, con); err != nil {
+					return err
+				}
+			} else {
+				// Remote side closed connection or error processing the request.
+				return <-con.ErrorCh()
+			}
+		case ev := <-con.PushCh():
+			pushEv := ev.(*Event)
+			err := s.pushConnectionDelta(con, pushEv)
+			pushEv.done()
+			if err != nil {
+				return err
+			}
+		case <-con.StopCh():
+			return nil
+		}
+	}
+}
+
+func (s *DiscoveryServer) receiveDelta(con *Connection, identities []string) {
+	defer func() {
+		close(con.deltaReqChan)
+		close(con.ErrorCh())
+		// Close the initialized channel, if its not already closed, to prevent blocking the stream
+		select {
+		case <-con.InitializedCh():
+		default:
+			close(con.InitializedCh())
+		}
+	}()
+	firstRequest := true
+	for {
+		req, err := con.deltaStream.Recv()
+		if err != nil {
+			if dubbogrpc.GRPCErrorType(err) != dubbogrpc.UnexpectedError {
+				klog.Infof("ADS: %q %s terminated", con.Peer(), con.ID())
+				return
+			}
+			con.ErrorCh() <- err
+			klog.Errorf("ADS: %q %s terminated with error: %v", con.Peer(), con.ID(), err)
+			return
+		}
+		// This should be only set for the first request. The node id may not be set - for example malicious clients.
+		if firstRequest {
+			// probe happens before envoy sends first xDS request
+			if req.TypeUrl == v3.HealthInfoType {
+				klog.Warningf("ADS: %q %s send health check probe before normal xDS request", con.Peer(), con.ID())
+				continue
+			}
+			firstRequest = false
+			if req.Node == nil || req.Node.Id == "" {
+				con.ErrorCh() <- status.New(codes.InvalidArgument, "missing node information").Err()
+				return
+			}
+			if err := s.initConnection(req.Node, con, identities); err != nil {
+				con.ErrorCh() <- err
+				return
+			}
+			defer s.closeConnection(con)
+			klog.Infof("ADS: new delta connection for node:%s", con.ID())
+		}
+
+		select {
+		case con.deltaReqChan <- req:
+		case <-con.deltaStream.Context().Done():
+			klog.Infof("ADS: %q %s terminated with stream closed", con.Peer(), con.ID())
+			return
+		}
+	}
+}
+
+func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) error {
+	pushRequest := pushEv.pushRequest
+
+	if pushRequest.Full {
+		// Update Proxy with current information.
+		s.computeProxyState(con.proxy, pushRequest)
+	}
+
+	pushRequest, needsPush := s.ProxyNeedsPush(con.proxy, pushRequest)
+	if !needsPush {
+		klog.V(2).Infof("Skipping push to %v, no updates required", con.ID())
+		return nil
+	}
+
+	// Send pushes to all generators
+	// Each Generator is responsible for determining if the push event requires a push
+	wrl := con.watchedResourcesByOrder()
+	for _, w := range wrl {
+		if err := s.pushDeltaXds(con, w, pushRequest); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryRequest, con *Connection) error {
+	return nil
+}
+
+func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest) {
+	return
+}
+
+func (s *DiscoveryServer) pushDeltaXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
+	return nil
+}
+
+func newDeltaConnection(peerAddr string, stream DeltaDiscoveryStream) *Connection {
+	return &Connection{
+		Connection:   xds.NewConnection(peerAddr, nil),
+		deltaStream:  stream,
+		deltaReqChan: make(chan *discovery.DeltaDiscoveryRequest, 1),
+	}
+}
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 9b51f9e..a4c58ed 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -21,7 +21,10 @@
 	"github.com/apache/dubbo-kubernetes/pkg/cluster"
 	"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
 	"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
 	"go.uber.org/atomic"
+	"golang.org/x/time/rate"
+	"google.golang.org/grpc"
 	"k8s.io/klog/v2"
 	"time"
 )
@@ -36,6 +39,8 @@
 	krtDebugger        *krt.DebugHandler
 	InboundUpdates     *atomic.Int64
 	CommittedUpdates   *atomic.Int64
+	RequestRateLimit   *rate.Limiter
+	ProxyNeedsPush     func(proxy *model.Proxy, req *model.PushRequest) (*model.PushRequest, bool)
 }
 
 func NewDiscoveryServer(env *model.Environment, clusterAliases map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
@@ -51,6 +56,11 @@
 	return out
 }
 
+func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
+	// Register v3 server
+	discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
+}
+
 func (s *DiscoveryServer) CachesSynced() {
 	klog.Infof("All caches have been synced up in %v, marking server ready", time.Since(s.DiscoveryStartTime))
 	s.serverReady.Store(true)
@@ -59,3 +69,7 @@
 func (s *DiscoveryServer) Shutdown() {
 	s.pushQueue.ShutDown()
 }
+
+func (s *DiscoveryServer) globalPushContext() *model.PushContext {
+	return s.Env.PushContext()
+}
diff --git a/sail/pkg/xds/v3/model.go b/sail/pkg/xds/v3/model.go
index 790de20..38aad5f 100644
--- a/sail/pkg/xds/v3/model.go
+++ b/sail/pkg/xds/v3/model.go
@@ -3,9 +3,14 @@
 import "github.com/apache/dubbo-kubernetes/pkg/model"
 
 const (
-	ClusterType  = model.ClusterType
-	ListenerType = model.ListenerType
-	EndpointType = model.EndpointType
-	RouteType    = model.RouteType
-	DebugType    = model.DebugType
+	ClusterType    = model.ClusterType
+	ListenerType   = model.ListenerType
+	EndpointType   = model.EndpointType
+	RouteType      = model.RouteType
+	DebugType      = model.DebugType
+	HealthInfoType = model.HealthInfoType
 )
+
+func GetShortType(typeURL string) string {
+	return model.GetShortType(typeURL)
+}
diff --git a/security/pkg/pki/ra/common.go b/security/pkg/pki/ra/common.go
index 3ce5b20..41e6da3 100644
--- a/security/pkg/pki/ra/common.go
+++ b/security/pkg/pki/ra/common.go
@@ -22,7 +22,7 @@
 	"crypto/x509"
 	"encoding/asn1"
 	"fmt"
-	"github.com/apache/dubbo-kubernetes/pkg/util/slices"
+	"github.com/apache/dubbo-kubernetes/pkg/slices"
 	"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
 	raerror "github.com/apache/dubbo-kubernetes/security/pkg/pki/error"
 	"github.com/apache/dubbo-kubernetes/security/pkg/pki/util"