blob: 852a5564f955e9734be58515cf0e6840e69f2f78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"path"
"strings"
"sync"
"time"
)
import (
"github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
const (
// ConnDelay connection delay interval
ConnDelay = 3
// MaxFailTimes max fail times
MaxFailTimes = 15
)
var (
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)
// ZookeeperClient represents zookeeper client Configuration
type ZookeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
eventRegistryLock sync.RWMutex
}
// nolint
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
return "zookeeper disconnected"
case zk.StateConnecting:
return "zookeeper connecting"
case zk.StateAuthFailed:
return "zookeeper auth failed"
case zk.StateConnectedReadOnly:
return "zookeeper connect readonly"
case zk.StateSaslAuthenticated:
return "zookeeper sasl authenticated"
case zk.StateExpired:
return "zookeeper connection expired"
case zk.StateConnected:
return "zookeeper connected"
case zk.StateHasSession:
return "zookeeper has session"
case zk.StateUnknown:
return "zookeeper unknown state"
case zk.State(zk.EventNodeDeleted):
return "zookeeper node deleted"
case zk.State(zk.EventNodeDataChanged):
return "zookeeper node data changed"
default:
return state.String()
}
}
// nolint
type Options struct {
zkName string
client *ZookeeperClient
ts *zk.TestCluster
}
// Option will define a function of handling Options
type Option func(*Options)
// WithZkName sets zk client name
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
var (
err error
)
options := &Options{}
for _, opt := range opts {
opt(options)
}
connected := false
lock := container.ZkClientLock()
url := container.GetUrl()
lock.Lock()
defer lock.Unlock()
if container.ZkClient() == nil {
// in dubbo, every registry only connect one node, so this is []string{r.Address}
timeout, paramErr := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if paramErr != nil {
logger.Errorf("timeout config %v is invalid, err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), paramErr.Error())
return perrors.WithMessagef(paramErr, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
newClient, cltErr := NewZookeeperClient(options.zkName, zkAddresses, timeout)
if cltErr != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
options.zkName, url.Location, timeout.String(), cltErr)
return perrors.WithMessagef(cltErr, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
connected = true
}
if container.ZkClient().Conn == nil {
var event <-chan zk.Event
container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
if err == nil {
container.ZkClient().Wait.Add(1)
connected = true
go container.ZkClient().HandleZkEvent(event)
}
}
if connected {
logger.Infof("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
// nolint
func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
var (
err error
event <-chan zk.Event
z *ZookeeperClient
)
z = &ZookeeperClient{
name: name,
ZkAddrs: zkAddrs,
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
z.Conn, event, err = zk.Connect(zkAddrs, timeout)
if err != nil {
return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
}
z.Wait.Add(1)
go z.HandleZkEvent(event)
return z, nil
}
// WithTestCluster sets test cluster for zk client
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
// NewMockZookeeperClient returns a mock client instance
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
z *ZookeeperClient
ts *zk.TestCluster
)
z = &ZookeeperClient{
name: name,
ZkAddrs: []string{},
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
options := &Options{}
for _, opt := range opts {
opt(options)
}
// connect to zookeeper
if options.ts != nil {
ts = options.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}
z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
return ts, z, event, nil
}
// HandleZkEvent handles zookeeper events
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
event zk.Event
)
defer func() {
z.Wait.Done()
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()
for {
select {
case <-z.exit:
return
case event = <-session:
logger.Infof("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
conn := z.Conn
z.Conn = nil
z.Unlock()
if conn != nil {
conn.Close()
}
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
event.Path, p)
for _, e := range a {
*e <- struct{}{}
}
}
}
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
}
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
}
}
z.eventRegistryLock.RUnlock()
}
state = (int)(event.State)
}
}
}
// RegisterEvent registers zookeeper events
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
}
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
a := z.eventRegistry[zkPath]
a = append(a, event)
z.eventRegistry[zkPath] = a
logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
}
// UnregisterEvent unregisters zookeeper events
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
}
for i, e := range infoList {
if e == event {
infoList = append(infoList[:i], infoList[i+1:]...)
logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d",
z.name, zkPath, event, len(infoList))
if len(infoList) == 0 {
delete(z.eventRegistry, zkPath)
} else {
z.eventRegistry[zkPath] = infoList
}
}
// nolint
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
func (z *ZookeeperClient) stop() bool {
select {
case <-z.exit:
return true
default:
close(z.exit)
}
return false
}
// ZkConnValid validates zookeeper connection
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
return false
default:
}
z.RLock()
defer z.RUnlock()
return z.Conn != nil
}
// nolint
func (z *ZookeeperClient) Close() {
if z == nil {
return
}
z.stop()
z.Wait.Wait()
z.Lock()
conn := z.Conn
z.Conn = nil
z.Unlock()
if conn != nil {
logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID())
conn.Close()
}
logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}
// Create will create the node recursively, which means that if the parent node is absent,
// it will create parent node first.
// And the value for the basePath is ""
func (z *ZookeeperClient) Create(basePath string) error {
return z.CreateWithValue(basePath, []byte(""))
}
// CreateWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first.
func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
var (
err error
tmpPath string
)
logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
conn := z.getConn()
err = errNilZkClientConn
if conn == nil {
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
for _, str := range strings.Split(basePath, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
if err == zk.ErrNodeExists {
logger.Debugf("zk.create(\"%s\") exists", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
}
}
return nil
}
// CreateTempWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first,and set value in last child path
// If the path exist, it will update data
func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) error {
var (
err error
tmpPath string
)
logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
conn := z.getConn()
err = errNilZkClientConn
if conn == nil {
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
pathSlice := strings.Split(basePath, "/")[1:]
length := len(pathSlice)
for i, str := range pathSlice {
tmpPath = path.Join(tmpPath, "/", str)
// last child need be ephemeral
if i == length-1 {
_, err = conn.Create(tmpPath, value, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == zk.ErrNodeExists {
return err
}
} else {
_, err = conn.Create(tmpPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
}
if err != nil {
if err == zk.ErrNodeExists {
logger.Debugf("zk.create(\"%s\") exists", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
}
}
return nil
}
// nolint
func (z *ZookeeperClient) Delete(basePath string) error {
err := errNilZkClientConn
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
}
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
// RegisterTemp registers temporary node by @basePath and @node
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
zkPath string
tmpPath string
)
err = errNilZkClientConn
zkPath = path.Join(basePath) + "/" + node
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)
return tmpPath, nil
}
// RegisterTempSeq register temporary sequence node by @basePath and @data
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
tmpPath string
)
err = errNilZkClientConn
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(
path.Join(basePath)+"/",
data,
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
}
logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)",
z.name, basePath, string(data), err)
return "", perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)
return tmpPath, nil
}
// GetChildrenW gets children watch by @path
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
children []string
stat *zk.Stat
watcher *zk.Watcher
)
err = errNilZkClientConn
conn := z.getConn()
if conn != nil {
children, stat, watcher, err = conn.ChildrenW(path)
}
if err != nil {
if err == zk.ErrNoChildrenForEphemerals {
return nil, nil, errNilChildren
}
if err == zk.ErrNoNode {
return nil, nil, errNilNode
}
logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
}
if stat == nil {
return nil, nil, perrors.Errorf("path{%s} get stat is nil", path)
}
if len(children) == 0 {
return nil, nil, errNilChildren
}
return children, watcher.EvtCh, nil
}
// GetChildren gets children by @path
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
children []string
stat *zk.Stat
)
err = errNilZkClientConn
conn := z.getConn()
if conn != nil {
children, stat, err = conn.Children(path)
}
if err != nil {
if err == zk.ErrNoNode {
return nil, perrors.Errorf("path{%s} has none children", path)
}
logger.Errorf("zk.Children(path{%s}) = error(%v)", path, perrors.WithStack(err))
return nil, perrors.WithMessagef(err, "zk.Children(path:%s)", path)
}
if stat == nil {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, errNilChildren
}
return children, nil
}
// ExistW to judge watch whether it exists or not by @zkPath
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
exist bool
err error
watcher *zk.Watcher
)
err = errNilZkClientConn
conn := z.getConn()
if conn != nil {
exist, _, watcher, err = conn.ExistsW(zkPath)
}
if err != nil {
logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err))
return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath)
}
if !exist {
logger.Warnf("zkClient{%s}'s App zk path{%s} does not exist.", z.name, zkPath)
return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does not exist.", z.name, zkPath)
}
return watcher.EvtCh, nil
}
// GetContent gets content by @zkPath
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
// nolint
func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int32) (*zk.Stat, error) {
return z.Conn.Set(zkPath, content, version)
}
// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
z.RLock()
defer z.RUnlock()
return z.Conn
}