| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package consul |
| |
| import ( |
| "encoding/base64" |
| "fmt" |
| "strconv" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/container/set" |
| "github.com/dubbogo/gost/hash/page" |
| consul "github.com/hashicorp/consul/api" |
| "github.com/hashicorp/consul/api/watch" |
| perrors "github.com/pkg/errors" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/common/constant" |
| "github.com/apache/dubbo-go/common/extension" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/config" |
| "github.com/apache/dubbo-go/registry" |
| ) |
| |
| const ( |
| enable = "enable" |
| watch_type = "type" |
| watch_type_service = "service" |
| watch_service = "service" |
| watch_passingonly = "passingonly" |
| watch_passingonly_true = true |
| ) |
| |
| var errConsulClientClosed = perrors.New("consul client is closed") |
| |
| // init will put the service discovery into extension |
| func init() { |
| extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery) |
| } |
| |
| // consulServiceDiscovery is the implementation of service discovery based on consul. |
| type consulServiceDiscovery struct { |
| // descriptor is a short string about the basic information of this instance |
| descriptor string |
| clientLock sync.RWMutex |
| // Consul client. |
| consulClient *consul.Client |
| checkPassInterval int64 |
| tag string |
| address string |
| deregisterCriticalServiceAfter string |
| ttl sync.Map |
| *consul.Config |
| } |
| |
| // newConsulServiceDiscovery will create new service discovery instance |
| // use double-check pattern to reduce race condition |
| func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { |
| sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) |
| if !ok || len(sdc.RemoteRef) == 0 { |
| return nil, perrors.New("could not init the instance because the config is invalid") |
| } |
| |
| remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) |
| if !ok { |
| return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) |
| } |
| |
| descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) |
| |
| config := &consul.Config{Address: remoteConfig.Address, Token: remoteConfig.Params[constant.ACL_TOKEN]} |
| client, err := consul.NewClient(config) |
| if err != nil { |
| return nil, perrors.WithMessage(err, "create consul client failed.") |
| } |
| |
| return &consulServiceDiscovery{ |
| address: remoteConfig.Address, |
| descriptor: descriptor, |
| checkPassInterval: getCheckPassInterval(remoteConfig.Params), |
| Config: config, |
| tag: remoteConfig.Params[constant.QUERY_TAG], |
| consulClient: client, |
| deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), |
| clientLock: sync.RWMutex{}, |
| }, nil |
| } |
| |
| func (csd *consulServiceDiscovery) String() string { |
| return csd.descriptor |
| } |
| |
| // nolint |
| func (csd *consulServiceDiscovery) getConsulClient() *consul.Client { |
| csd.clientLock.RLock() |
| defer csd.clientLock.RUnlock() |
| return csd.consulClient |
| } |
| |
| // nolint |
| func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) { |
| csd.clientLock.Lock() |
| defer csd.clientLock.Unlock() |
| csd.consulClient = consulClient |
| } |
| |
| func (csd *consulServiceDiscovery) Destroy() error { |
| csd.setConsulClient(nil) |
| csd.ttl.Range(func(key, t interface{}) bool { |
| close(t.(chan struct{})) |
| csd.ttl.Delete(key) |
| return true |
| }) |
| return nil |
| } |
| |
| func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { |
| var ( |
| err error |
| consulClient *consul.Client |
| ) |
| ins, _ := csd.buildRegisterInstance(instance) |
| if consulClient = csd.getConsulClient(); consulClient == nil { |
| return errConsulClientClosed |
| } |
| err = consulClient.Agent().ServiceRegister(ins) |
| if err != nil { |
| logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) |
| return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) |
| } |
| |
| return csd.registerTtl(instance) |
| } |
| |
| func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { |
| var ( |
| err error |
| consulClient *consul.Client |
| ) |
| |
| checkID := buildID(instance) |
| |
| stopChan := make(chan struct{}) |
| csd.ttl.LoadOrStore(buildID(instance), stopChan) |
| |
| period := time.Duration(csd.checkPassInterval/8) * time.Millisecond |
| timer := time.NewTicker(period) |
| go func() { |
| defer timer.Stop() |
| for { |
| select { |
| case <-timer.C: |
| if consulClient = csd.getConsulClient(); consulClient == nil { |
| logger.Debugf("consul client is closed!") |
| return |
| } |
| err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") |
| if err != nil { |
| logger.Warnf("pass ttl heartbeat fail:%v", err) |
| break |
| } |
| logger.Debugf("passed ttl heartbeat for %s", checkID) |
| break |
| case <-stopChan: |
| logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName()) |
| return |
| } |
| } |
| }() |
| return nil |
| } |
| |
| func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { |
| var ( |
| err error |
| consulClient *consul.Client |
| ) |
| ins, _ := csd.buildRegisterInstance(instance) |
| consulClient = csd.getConsulClient() |
| if consulClient == nil { |
| return errConsulClientClosed |
| } |
| err = consulClient.Agent().ServiceDeregister(buildID(instance)) |
| if err != nil { |
| logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) |
| } |
| return consulClient.Agent().ServiceRegister(ins) |
| } |
| |
| func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { |
| var ( |
| err error |
| consulClient *consul.Client |
| ) |
| if consulClient = csd.getConsulClient(); consulClient == nil { |
| return errConsulClientClosed |
| } |
| err = consulClient.Agent().ServiceDeregister(buildID(instance)) |
| if err != nil { |
| logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) |
| return err |
| } |
| stopChanel, ok := csd.ttl.Load(buildID(instance)) |
| if !ok { |
| logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) |
| return nil |
| } |
| close(stopChanel.(chan struct{})) |
| csd.ttl.Delete(buildID(instance)) |
| return nil |
| } |
| |
| func (csd *consulServiceDiscovery) GetDefaultPageSize() int { |
| return registry.DefaultPageSize |
| } |
| |
| func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { |
| var ( |
| err error |
| consulClient *consul.Client |
| services map[string][]string |
| ) |
| res := gxset.NewSet() |
| if consulClient = csd.getConsulClient(); consulClient == nil { |
| logger.Warnf("consul client is closed!") |
| return res |
| } |
| services, _, err = consulClient.Catalog().Services(nil) |
| if err != nil { |
| logger.Errorf("get services,error: %v", err) |
| return res |
| } |
| |
| for service := range services { |
| res.Add(service) |
| } |
| return res |
| } |
| |
| // encodeConsulMetadata because consul validate key strictly. |
| func encodeConsulMetadata(metadata map[string]string) map[string]string { |
| consulMetadata := make(map[string]string, len(metadata)) |
| encoder := base64.RawStdEncoding |
| for k, v := range metadata { |
| consulMetadata[encoder.EncodeToString([]byte(k))] = v |
| } |
| return consulMetadata |
| } |
| |
| // nolint |
| func decodeConsulMetadata(metadata map[string]string) map[string]string { |
| dubboMetadata := make(map[string]string, len(metadata)) |
| encoder := base64.RawStdEncoding |
| for k, v := range metadata { |
| kBytes, err := encoder.DecodeString(k) |
| if err != nil { |
| logger.Warnf("can not decoded consul metadata key %s", k) |
| continue |
| } |
| dubboMetadata[string(kBytes)] = v |
| } |
| return dubboMetadata |
| } |
| |
| func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { |
| var ( |
| err error |
| consulClient *consul.Client |
| instances []*consul.ServiceEntry |
| ) |
| if consulClient = csd.getConsulClient(); consulClient == nil { |
| logger.Warn("consul client is closed!") |
| return nil |
| } |
| instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ |
| WaitTime: time.Duration(csd.checkPassInterval), |
| }) |
| |
| if err != nil { |
| logger.Errorf("get instances for service %s,error: %v", serviceName, err) |
| return nil |
| } |
| |
| res := make([]registry.ServiceInstance, 0, len(instances)) |
| for _, ins := range instances { |
| metadata := ins.Service.Meta |
| |
| // enable status |
| enableStr := metadata[enable] |
| delete(metadata, enable) |
| enable, _ := strconv.ParseBool(enableStr) |
| metadata = decodeConsulMetadata(metadata) |
| |
| // health status |
| status := ins.Checks.AggregatedStatus() |
| healthy := false |
| if status == consul.HealthPassing { |
| healthy = true |
| } |
| res = append(res, ®istry.DefaultServiceInstance{ |
| Id: ins.Service.ID, |
| ServiceName: ins.Service.Service, |
| Host: ins.Service.Address, |
| Port: ins.Service.Port, |
| Enable: enable, |
| Healthy: healthy, |
| Metadata: metadata, |
| }) |
| } |
| |
| return res |
| } |
| |
| func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { |
| all := csd.GetInstances(serviceName) |
| res := make([]interface{}, 0, pageSize) |
| for i := offset; i < len(all) && i < offset+pageSize; i++ { |
| res = append(res, all[i]) |
| } |
| return gxpage.NewPage(offset, pageSize, res, len(all)) |
| } |
| |
| func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { |
| all := csd.GetInstances(serviceName) |
| res := make([]interface{}, 0, pageSize) |
| // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance |
| var ( |
| i = offset |
| count = 0 |
| ) |
| for i < len(all) && count < pageSize { |
| ins := all[i] |
| if ins.IsHealthy() == healthy { |
| res = append(res, all[i]) |
| count++ |
| } |
| i++ |
| } |
| return gxpage.NewPage(offset, pageSize, res, len(all)) |
| } |
| |
| func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { |
| res := make(map[string]gxpage.Pager, len(serviceNames)) |
| for _, name := range serviceNames { |
| res[name] = csd.GetInstancesByPage(name, offset, requestedSize) |
| } |
| return res |
| } |
| |
| func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { |
| params := make(map[string]interface{}, 8) |
| params[watch_type] = watch_type_service |
| params[watch_service] = listener.ServiceName |
| params[watch_passingonly] = watch_passingonly_true |
| plan, err := watch.Parse(params) |
| if err != nil { |
| logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err) |
| return err |
| } |
| |
| plan.Handler = func(idx uint64, raw interface{}) { |
| services, ok := raw.([]*consul.ServiceEntry) |
| if !ok { |
| err = perrors.New("handler get non ServiceEntry type parameter") |
| return |
| } |
| instances := make([]registry.ServiceInstance, 0, len(services)) |
| for _, ins := range services { |
| metadata := ins.Service.Meta |
| |
| // enable status |
| enableStr := metadata[enable] |
| delete(metadata, enable) |
| enable, _ := strconv.ParseBool(enableStr) |
| |
| // health status |
| status := ins.Checks.AggregatedStatus() |
| healthy := false |
| if status == consul.HealthPassing { |
| healthy = true |
| } |
| instances = append(instances, ®istry.DefaultServiceInstance{ |
| Id: ins.Service.ID, |
| ServiceName: ins.Service.Service, |
| Host: ins.Service.Address, |
| Port: ins.Service.Port, |
| Enable: enable, |
| Healthy: healthy, |
| Metadata: metadata, |
| }) |
| } |
| e := csd.DispatchEventForInstances(listener.ServiceName, instances) |
| if e != nil { |
| logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) |
| } |
| } |
| go func() { |
| err = plan.RunWithConfig(csd.Config.Address, csd.Config) |
| if err != nil { |
| logger.Error("consul plan run failure!error:%v", err) |
| } |
| }() |
| return nil |
| } |
| |
| func (csd *consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error { |
| return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName)) |
| } |
| |
| func (csd *consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { |
| return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) |
| } |
| |
| func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { |
| extension.GetGlobalDispatcher().Dispatch(event) |
| return nil |
| } |
| |
| func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { |
| metadata := instance.GetMetadata() |
| metadata = encodeConsulMetadata(metadata) |
| metadata[enable] = strconv.FormatBool(instance.IsEnable()) |
| // check |
| check := csd.buildCheck() |
| |
| return &consul.AgentServiceRegistration{ |
| ID: buildID(instance), |
| Name: instance.GetServiceName(), |
| Port: instance.GetPort(), |
| Address: instance.GetHost(), |
| Meta: metadata, |
| Check: &check, |
| }, nil |
| } |
| |
| func (csd *consulServiceDiscovery) buildCheck() consul.AgentServiceCheck { |
| return consul.AgentServiceCheck{ |
| TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", |
| DeregisterCriticalServiceAfter: csd.deregisterCriticalServiceAfter, |
| } |
| } |
| |
| // nolint |
| func getCheckPassInterval(params map[string]string) int64 { |
| checkPassIntervalStr, ok := params[constant.CHECK_PASS_INTERVAL] |
| if !ok { |
| return constant.DEFAULT_CHECK_PASS_INTERVAL |
| } |
| checkPassInterval, err := strconv.ParseInt(checkPassIntervalStr, 10, 64) |
| if err != nil { |
| logger.Warnf("consul service discovery remote config error:%s", checkPassIntervalStr) |
| return constant.DEFAULT_CHECK_PASS_INTERVAL |
| } |
| return checkPassInterval |
| } |
| |
| // nolint |
| func getDeregisterAfter(metadata map[string]string) string { |
| deregister, ok := metadata[constant.DEREGISTER_AFTER] |
| if !ok || len(deregister) == 0 { |
| deregister = constant.DEFAULT_DEREGISTER_TIME |
| } |
| return deregister |
| } |
| |
| // nolint |
| func buildID(instance registry.ServiceInstance) string { |
| id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) |
| return id |
| } |