| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bootstrap |
| |
| import ( |
| "fmt" |
| ) |
| |
| import ( |
| "istio.io/pkg/log" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate" |
| kubecontroller "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry" |
| ) |
| |
| func (s *Server) ServiceController() *aggregate.Controller { |
| return s.environment.ServiceDiscovery.(*aggregate.Controller) |
| } |
| |
| // initServiceControllers creates and initializes the service controllers |
| func (s *Server) initServiceControllers(args *PilotArgs) error { |
| serviceControllers := s.ServiceController() |
| |
| s.serviceEntryController = serviceentry.NewController( |
| s.configController, s.environment.ConfigStore, s.XDSServer, |
| serviceentry.WithClusterID(s.clusterID), |
| ) |
| serviceControllers.AddRegistry(s.serviceEntryController) |
| |
| registered := make(map[provider.ID]bool) |
| for _, r := range args.RegistryOptions.Registries { |
| serviceRegistry := provider.ID(r) |
| if _, exists := registered[serviceRegistry]; exists { |
| log.Warnf("%s registry specified multiple times.", r) |
| continue |
| } |
| registered[serviceRegistry] = true |
| log.Infof("Adding %s registry adapter", serviceRegistry) |
| switch serviceRegistry { |
| case provider.Kubernetes: |
| if err := s.initKubeRegistry(args); err != nil { |
| return err |
| } |
| default: |
| return fmt.Errorf("service registry %s is not supported", r) |
| } |
| } |
| |
| // Defer running of the service controllers. |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| go serviceControllers.Run(stop) |
| return nil |
| }) |
| |
| return nil |
| } |
| |
| // initKubeRegistry creates all the k8s service controllers under this pilot |
| func (s *Server) initKubeRegistry(args *PilotArgs) (err error) { |
| args.RegistryOptions.KubeOptions.ClusterID = s.clusterID |
| args.RegistryOptions.KubeOptions.Metrics = s.environment |
| args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer |
| args.RegistryOptions.KubeOptions.NetworksWatcher = s.environment.NetworksWatcher |
| args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher |
| args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace |
| args.RegistryOptions.KubeOptions.MeshServiceController = s.ServiceController() |
| |
| s.multiclusterController.AddHandler(kubecontroller.NewMulticluster(args.PodName, |
| s.kubeClient, |
| args.RegistryOptions.ClusterRegistriesNamespace, |
| args.RegistryOptions.KubeOptions, |
| s.serviceEntryController, |
| s.istiodCertBundleWatcher, |
| args.Revision, |
| s.shouldStartNsController(), |
| s.environment.ClusterLocal(), |
| s.server)) |
| |
| return |
| } |