| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package springcloud |
| |
| import ( |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/pkg/errors" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/servicediscovery" |
| "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/servicediscovery/nacos" |
| "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/servicediscovery/zookeeper" |
| "github.com/apache/dubbo-go-pixiu/pkg/common/constant" |
| "github.com/apache/dubbo-go-pixiu/pkg/common/extension/adapter" |
| "github.com/apache/dubbo-go-pixiu/pkg/logger" |
| "github.com/apache/dubbo-go-pixiu/pkg/model" |
| "github.com/apache/dubbo-go-pixiu/pkg/server" |
| ) |
| |
| const ( |
| // Kind is the kind of Adapter Plugin. |
| Kind = constant.SpringCloudAdapter |
| |
| Nacos = "nacos" |
| Zookeeper = "zookeeper" |
| ) |
| |
| func init() { |
| adapter.RegisterAdapterPlugin(&CloudPlugin{}) |
| } |
| |
| type ( |
| // CloudPlugin the adapter plugin for spring cloud |
| CloudPlugin struct { |
| } |
| |
| // CloudAdapter the adapter for spring cloud |
| CloudAdapter struct { |
| cfg *Config |
| sd servicediscovery.ServiceDiscovery |
| currentService map[string]*Service |
| |
| mutex sync.Mutex |
| stopChan chan struct{} |
| } |
| |
| // Config the config for CloudAdapter |
| Config struct { |
| // Mode default 0 start backgroup fresh and watch, 1 only fresh 2 only watch |
| Mode int `yaml:"mode" json:"mode" default:"mode"` |
| Registry *model.RemoteConfig `yaml:"registry" json:"registry" default:"registry"` |
| FreshInterval time.Duration `yaml:"freshInterval" json:"freshInterval" default:"freshInterval"` |
| Services []string `yaml:"services" json:"services" default:"services"` |
| // SubscribePolicy subscribe config, |
| // - adapting : if there is no any Services (App) names, fetch All services from registry center |
| // - definitely : fetch services by the config Services (App) names |
| SubscribePolicy string `yaml:"subscribe-policy" json:"subscribe-policy" default:"adapting"` |
| } |
| |
| Service struct { |
| Name string |
| } |
| |
| SubscribePolicy int |
| ) |
| |
| const ( |
| Adapting SubscribePolicy = iota |
| Definitely |
| ) |
| |
| func (sp SubscribePolicy) String() string { |
| return [...]string{"adapting", "definitely"}[sp] |
| } |
| |
| // Kind return plugin kind |
| func (p *CloudPlugin) Kind() string { |
| return Kind |
| } |
| |
| // CreateAdapter create adapter |
| func (p *CloudPlugin) CreateAdapter(ad *model.Adapter) (adapter.Adapter, error) { |
| return &CloudAdapter{cfg: &Config{}, stopChan: make(chan struct{}), currentService: make(map[string]*Service)}, nil |
| } |
| |
| // Start start the adapter |
| func (a *CloudAdapter) Start() { |
| |
| // do not block the main goroutine |
| // init get all service instance |
| err := a.firstFetch() |
| if err != nil { |
| logger.Errorf("init fetch service fail", err.Error()) |
| } |
| |
| // background sync service instance from remote |
| err = a.backgroundSyncPeriod() |
| if err != nil { |
| logger.Errorf("init periodicity fetch service task fail", err.Error()) |
| } |
| |
| // watch then fetch is more safety for consistent but there is background fresh mechanism |
| err = a.watch() |
| if err != nil { |
| logger.Errorf("init watch the register fail", err.Error()) |
| } |
| } |
| |
| // Stop stop the adapter |
| func (a *CloudAdapter) Stop() { |
| err := a.stop() |
| if err != nil { |
| logger.Errorf("stop the adapter fail", err.Error()) |
| return |
| } |
| } |
| |
| // Apply init |
| func (a *CloudAdapter) Apply() error { |
| //registryUsed := ad.Config["registry"].(map[string]interface{}) |
| var ( |
| sd servicediscovery.ServiceDiscovery |
| err error |
| ) |
| switch strings.ToLower(a.cfg.Registry.Protocol) { |
| case Nacos: |
| sd, err = nacos.NewNacosServiceDiscovery(a.cfg.Services, a.cfg.Registry, a) |
| case Zookeeper: |
| sd, err = zookeeper.NewZKServiceDiscovery(a.cfg.Services, a.cfg.Registry, a) |
| default: |
| return errors.New("adapter init error registry not recognise") |
| } |
| if err != nil { |
| logger.Errorf("Apply NewServiceDiscovery %s ", a.cfg.Registry.Protocol, err.Error()) |
| return err |
| } |
| a.sd = sd |
| return nil |
| } |
| |
| func (a *CloudAdapter) OnAddServiceInstance(instance *servicediscovery.ServiceInstance) { |
| cm := server.GetClusterManager() |
| endpoint := instance.ToEndpoint() |
| |
| a.mutex.Lock() |
| defer a.mutex.Unlock() |
| |
| // endpoint name should equal with cluster name |
| cm.SetEndpoint(endpoint.Name, endpoint) |
| if a.checkHasExistService(endpoint.Name) { |
| return |
| } |
| // new service, so add route and into CurrentService map |
| a.addNewService(instance) |
| // route ID is cluster name, so equal with endpoint name |
| rm := server.GetRouterManager() |
| route := instance.ToRoute() |
| rm.AddRouter(route) |
| } |
| |
| func (a *CloudAdapter) OnDeleteServiceInstance(instance *servicediscovery.ServiceInstance) { |
| cm := server.GetClusterManager() |
| endpoint := instance.ToEndpoint() |
| |
| cm.DeleteEndpoint(endpoint.Name, endpoint.ID) |
| |
| } |
| |
| func (a *CloudAdapter) OnUpdateServiceInstance(instance *servicediscovery.ServiceInstance) { |
| cm := server.GetClusterManager() |
| endpoint := instance.ToEndpoint() |
| a.mutex.Lock() |
| defer a.mutex.Unlock() |
| cm.SetEndpoint(endpoint.Name, endpoint) |
| } |
| |
| func (a *CloudAdapter) GetServiceNames() []string { |
| a.mutex.Lock() |
| defer a.mutex.Unlock() |
| |
| res := make([]string, 0, len(a.currentService)) |
| |
| for k := range a.currentService { |
| res = append(res, k) |
| } |
| |
| return res |
| } |
| |
| // Config get config for Adapter |
| func (a *CloudAdapter) Config() interface{} { |
| return a.cfg |
| } |
| |
| func (a *CloudAdapter) fetchServiceByConfig() ([]servicediscovery.ServiceInstance, error) { |
| var instances []servicediscovery.ServiceInstance |
| var err error |
| // if configure specific services, then fetch those service instance only |
| if a.subscribeServiceDefinitely() { |
| if len(a.cfg.Services) > 0 { |
| instances, err = a.sd.QueryServicesByName(a.cfg.Services) |
| } else { |
| logger.Warnf("No any Service(App) need Subscribe, config the Service(App) Names or make the `subscribe-policy: adapting` pls.") |
| } |
| } else { |
| if len(a.cfg.Services) > 0 { |
| instances, err = a.sd.QueryServicesByName(a.cfg.Services) |
| } else { |
| instances, err = a.sd.QueryAllServices() |
| } |
| } |
| |
| if err != nil { |
| logger.Errorf("fetchServiceByConfig error ", err.Error()) |
| return instances, err |
| } |
| return instances, nil |
| } |
| |
| func (a *CloudAdapter) firstFetch() error { |
| |
| instances, err := a.fetchServiceByConfig() |
| |
| if err != nil { |
| logger.Errorf("start query all service error ", err.Error()) |
| return err |
| } |
| // manage cluster and route |
| cm := server.GetClusterManager() |
| rm := server.GetRouterManager() |
| for _, instance := range instances { |
| endpoint := instance.ToEndpoint() |
| // todo: maybe instance service name not equal with cluster name ? |
| cm.SetEndpoint(endpoint.Name, endpoint) |
| // route ID is cluster name, so equal with endpoint name |
| route := instance.ToRoute() |
| rm.AddRouter(route) |
| } |
| a.clearAndSetCurrentService(instances) |
| return nil |
| } |
| |
| func (a *CloudAdapter) clearAndSetCurrentService(instances []servicediscovery.ServiceInstance) { |
| a.currentService = make(map[string]*Service) |
| |
| for _, instance := range instances { |
| if _, ok := a.currentService[instance.ServiceName]; ok { |
| continue |
| } |
| a.currentService[instance.ServiceName] = &Service{Name: instance.ServiceName} |
| } |
| } |
| |
| func (a *CloudAdapter) checkHasExistService(name string) bool { |
| _, ok := a.currentService[name] |
| return ok |
| } |
| |
| func (a *CloudAdapter) addNewService(instance *servicediscovery.ServiceInstance) { |
| a.currentService[instance.ServiceName] = &Service{Name: instance.ServiceName} |
| } |
| |
| func (a *CloudAdapter) fetchCompareAndSet() { |
| instances, err := a.fetchServiceByConfig() |
| if err != nil { |
| logger.Warnf("fetchCompareAndSet all service error ", err.Error()) |
| return |
| } |
| _ = a.watch() |
| // manage cluster and route |
| cm := server.GetClusterManager() |
| rm := server.GetRouterManager() |
| |
| a.mutex.Lock() |
| defer a.mutex.Unlock() |
| |
| oldStore, err := cm.CloneStore() |
| |
| if err != nil { |
| logger.Warnf("fetchCompareAndSet clone store error ", err.Error()) |
| } |
| |
| newStore := cm.NewStore(oldStore.Version) |
| |
| for _, instance := range instances { |
| endpoint := instance.ToEndpoint() |
| // endpoint name should equal with cluster name |
| newStore.SetEndpoint(endpoint.Name, endpoint) |
| } |
| |
| // maximize reduction the interval of down state |
| // first remove the router for removed cluster |
| for _, c := range oldStore.Config { |
| if !newStore.HasCluster(c.Name) { |
| rm.DeleteRouter(&model.Router{ID: c.Name}) |
| } |
| } |
| // second set cluster |
| ret := cm.CompareAndSetStore(newStore) |
| |
| if !ret { |
| // fast fail the delete route at first phase shouldn't recover |
| return |
| } |
| |
| // third add new router |
| for _, c := range newStore.Config { |
| if !oldStore.HasCluster(c.Name) { |
| match := model.NewRouterMatchPrefix(c.Name) |
| route := model.RouteAction{Cluster: c.Name} |
| added := &model.Router{ID: c.Name, Match: match, Route: route} |
| rm.AddRouter(added) |
| } |
| } |
| } |
| |
| func (a *CloudAdapter) backgroundSyncPeriod() error { |
| if a.cfg.FreshInterval <= 0 { |
| return nil |
| } |
| timer := time.NewTicker(a.cfg.FreshInterval) |
| go func() { |
| defer timer.Stop() |
| for { |
| select { |
| case <-timer.C: |
| a.fetchCompareAndSet() |
| break |
| case <-a.stopChan: |
| logger.Info("stop the adapter") |
| return |
| } |
| } |
| }() |
| return nil |
| } |
| |
| func (a *CloudAdapter) watch() error { |
| return a.sd.Subscribe() |
| } |
| |
| func (a *CloudAdapter) stop() error { |
| err := a.sd.Unsubscribe() |
| if err != nil { |
| logger.Errorf("unsubscribe registry fail ", err.Error()) |
| } |
| close(a.stopChan) |
| return nil |
| } |
| |
| func (a *CloudAdapter) subscribeServiceDefinitely() bool { |
| return strings.EqualFold(a.cfg.SubscribePolicy, Definitely.String()) |
| } |