blob: 901e6aff9a218effe753f1c06d0c705d2ce1f35d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"encoding/json"
"path"
"strings"
"sync"
"time"
)
import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
)
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud/common"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud/servicediscovery"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/remote/zookeeper"
)
const (
ZKRootPath = "/services"
ZKName = "SpringCloud-Zookeeper"
StatUP = "UP"
MaxFailTimes = 3
DefaultTimeout = "3s"
ConnDelay = 3 * time.Second
defaultTTL = 3 * time.Second
)
type zookeeperDiscovery struct {
basePath string
targetService []string
listener servicediscovery.ServiceEventListener
instanceMapLock sync.Mutex
instanceMap map[string]*servicediscovery.ServiceInstance
zkListener map[string]zookeeper.Listener
clientFacade *BaseZkClientFacade
}
func NewZKServiceDiscovery(targetService []string, config *model.RemoteConfig, listener servicediscovery.ServiceEventListener) (servicediscovery.ServiceDiscovery, error) {
var err error
if len(config.Timeout) == 0 {
config.Timeout = DefaultTimeout
}
client, err := zookeeper.NewZkClient(ZKName, config)
if err != nil {
return nil, err
}
rootPath := ZKRootPath
if len(strings.TrimSpace(config.Root)) > 0 {
rootPath = strings.TrimSpace(config.Root)
}
z := &zookeeperDiscovery{
basePath: rootPath,
listener: listener,
targetService: targetService,
instanceMap: make(map[string]*servicediscovery.ServiceInstance),
zkListener: map[string]zookeeper.Listener{},
clientFacade: &BaseZkClientFacade{
name: ZKName,
client: client,
conf: config,
clientLock: sync.Mutex{},
wg: sync.WaitGroup{},
done: make(chan struct{}),
},
}
go zookeeper.HandleClientRestart(z.clientFacade)
return z, err
}
func (sd *zookeeperDiscovery) QueryAllServices() ([]servicediscovery.ServiceInstance, error) {
serviceNames, err := sd.queryForNames()
logger.Debugf("%s get all services by root path %s, services %v", common.ZKLogDiscovery, sd.basePath, serviceNames)
if err != nil {
logger.Errorf("get all services error: %v", err.Error())
return nil, err
}
return sd.QueryServicesByName(serviceNames)
}
func (sd *zookeeperDiscovery) QueryServicesByName(serviceNames []string) ([]servicediscovery.ServiceInstance, error) {
var instancesAll []servicediscovery.ServiceInstance
for _, s := range serviceNames {
pathForName := sd.pathForName(s)
ids, err := sd.getClient().GetChildren(pathForName)
logger.Debugf("%s get services %s, services instanceIds %s", common.ZKLogDiscovery, s, ids)
if err != nil {
// todo refactor gost zk, make it return the definite err
if strings.Contains(err.Error(), "none children") {
logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
} else {
logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error())
}
continue
}
for _, id := range ids {
var instance *servicediscovery.ServiceInstance
instance, err = sd.queryForInstance(s, id)
if err != nil {
return nil, err
}
instancesAll = append(instancesAll, *instance)
}
}
for _, instance := range instancesAll {
if sd.instanceMap[instance.ID] == nil {
sd.instanceMap[instance.ID] = &instance
}
}
return instancesAll, nil
}
func (sd *zookeeperDiscovery) Register() error {
logger.Debugf("%s Register implement me!!", common.ZKLogDiscovery)
return nil
}
func (sd *zookeeperDiscovery) UnRegister() error {
logger.Debugf("%s UnRegister implement me!!", common.ZKLogDiscovery)
return nil
}
func (sd *zookeeperDiscovery) getClient() *gxzookeeper.ZookeeperClient {
if err := zookeeper.ValidateZookeeperClient(sd.clientFacade, "zka3"); err != nil {
logger.Errorf("%s ValidateZookeeperClient error %s", common.ZKLogDiscovery, err)
}
return sd.clientFacade.ZkClient()
}
func (sd *zookeeperDiscovery) Subscribe() error {
logger.Debugf("%s Subscribe ...", common.ZKLogDiscovery)
sd.zkListener[sd.basePath] = newZkAppListener(sd)
sd.zkListener[sd.basePath].WatchAndHandle()
logger.Debugf("%s Subscribe Success!", common.ZKLogDiscovery)
return nil
}
func (sd *zookeeperDiscovery) Unsubscribe() error {
logger.Debugf("%s Unsubscribe ...", common.ZKLogDiscovery)
for k, listener := range sd.zkListener {
logger.Infof("Unsubscribe listener %s", k)
listener.Close()
}
logger.Debugf("%s Unsubscribe Success!", common.ZKLogDiscovery)
return nil
}
func (sd *zookeeperDiscovery) queryForInstance(name string, id string) (*servicediscovery.ServiceInstance, error) {
path := sd.pathForInstance(name, id)
data, _, err := sd.getClient().GetContent(path)
if err != nil {
return nil, err
}
sczk := &SpringCloudZKInstance{}
instance := &servicediscovery.ServiceInstance{}
err = json.Unmarshal(data, sczk)
if err != nil {
return nil, err
}
instance.Port = sczk.Port
instance.ServiceName = sczk.Name
instance.Host = sczk.Address
instance.ID = sczk.ID
instance.CLusterName = sczk.Name
instance.Healthy = sczk.Payload.Metadata.InstanceStatus == StatUP
return instance, nil
}
func (sd *zookeeperDiscovery) getServiceMap() map[string][]*servicediscovery.ServiceInstance {
m := make(map[string][]*servicediscovery.ServiceInstance)
for _, instance := range sd.instanceMap {
if instances := m[instance.ServiceName]; instances == nil {
m[instance.ServiceName] = []*servicediscovery.ServiceInstance{}
}
m[instance.ServiceName] = append(m[instance.ServiceName], instance)
}
return m
}
func (sd *zookeeperDiscovery) delServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) {
if instance == nil {
return true, nil
}
defer sd.instanceMapLock.Unlock()
sd.instanceMapLock.Lock()
if sd.instanceMap[instance.ID] != nil {
sd.listener.OnDeleteServiceInstance(instance)
delete(sd.instanceMap, instance.ID)
}
return true, nil
}
func (sd *zookeeperDiscovery) updateServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) {
if instance == nil {
return true, nil
}
defer sd.instanceMapLock.Unlock()
sd.instanceMapLock.Lock()
if sd.instanceMap[instance.ID] != nil {
sd.listener.OnUpdateServiceInstance(instance)
sd.instanceMap[instance.ID] = instance
}
return true, nil
}
func (sd *zookeeperDiscovery) addServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) {
if instance == nil {
return true, nil
}
defer sd.instanceMapLock.Unlock()
sd.instanceMapLock.Lock()
if sd.instanceMap[instance.ID] == nil {
sd.listener.OnAddServiceInstance(instance)
sd.instanceMap[instance.ID] = instance
}
return true, nil
}
func (sd *zookeeperDiscovery) queryForNames() ([]string, error) {
children, err := sd.getClient().GetChildren(sd.basePath)
// todo refactor gost zk, make it return the definite err
if err != nil && strings.Contains(err.Error(), "none children") {
logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
return nil, nil
}
return children, err
}
func (sd *zookeeperDiscovery) pathForInstance(name, id string) string {
return path.Join(sd.basePath, name, id)
}
func (sd *zookeeperDiscovery) pathForName(name string) string {
return path.Join(sd.basePath, name)
}
type SpringCloudZKInstance struct {
Name string `json:"name"`
ID string `json:"id"`
Address string `json:"address"`
Port int `json:"port"`
SslPort interface{} `json:"sslPort"`
Payload struct {
Class string `json:"@class"`
ID string `json:"id"`
Name string `json:"name"`
Metadata struct {
InstanceStatus string `json:"instance_status"`
} `json:"metadata"`
} `json:"payload"`
RegistrationTimeUTC int64 `json:"registrationTimeUTC"`
ServiceType string `json:"serviceType"`
URISpec struct {
Parts []struct {
Value string `json:"value"`
Variable bool `json:"variable"`
} `json:"parts"`
} `json:"uriSpec"`
}
type BaseZkClientFacade struct {
name string
client *gxzookeeper.ZookeeperClient
clientLock sync.Mutex
wg sync.WaitGroup
done chan struct{}
conf *model.RemoteConfig
}
func (b *BaseZkClientFacade) Name() string {
return b.name
}
func (b *BaseZkClientFacade) ZkClient() *gxzookeeper.ZookeeperClient {
return b.client
}
func (b *BaseZkClientFacade) SetZkClient(client *gxzookeeper.ZookeeperClient) {
b.client = client
}
func (b *BaseZkClientFacade) ZkClientLock() *sync.Mutex {
return &b.clientLock
}
func (b *BaseZkClientFacade) WaitGroup() *sync.WaitGroup {
return &b.wg
}
func (b *BaseZkClientFacade) Done() chan struct{} {
return b.done
}
func (b *BaseZkClientFacade) RestartCallBack() bool {
return true
}
func (b *BaseZkClientFacade) GetConfig() *model.RemoteConfig {
return b.conf
}
func Keys(m map[string][]*servicediscovery.ServiceInstance) []string {
j := 0
keys := make([]string, len(m))
for k := range m {
keys[j] = k
j++
}
return keys
}
func Diff(a, b []string) (diff []string) {
m := make(map[string]bool)
for _, item := range b {
m[item] = true
}
for _, item := range a {
if _, ok := m[item]; !ok {
diff = append(diff, item)
}
}
return diff
}