/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package zookeeper

import (
	"path"
	"strings"
	"sync"
	"time"
)

import (
	"github.com/dubbogo/go-zookeeper/zk"
	perrors "github.com/pkg/errors"
)

import (
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
)

const (
	// ConnDelay connection delay interval
	ConnDelay = 3
	// MaxFailTimes max fail times
	MaxFailTimes = 15
)

var (
	errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
	errNilChildren     = perrors.Errorf("has none children")
	errNilNode         = perrors.Errorf("node does not exist")
)

// ZookeeperClient represents zookeeper client Configuration
type ZookeeperClient struct {
	name         string
	ZkAddrs      []string
	sync.RWMutex // for conn
	Conn         *zk.Conn
	Timeout      time.Duration
	exit         chan struct{}
	Wait         sync.WaitGroup

	eventRegistry     map[string][]*chan struct{}
	eventRegistryLock sync.RWMutex
}

// nolint
func StateToString(state zk.State) string {
	switch state {
	case zk.StateDisconnected:
		return "zookeeper disconnected"
	case zk.StateConnecting:
		return "zookeeper connecting"
	case zk.StateAuthFailed:
		return "zookeeper auth failed"
	case zk.StateConnectedReadOnly:
		return "zookeeper connect readonly"
	case zk.StateSaslAuthenticated:
		return "zookeeper sasl authenticated"
	case zk.StateExpired:
		return "zookeeper connection expired"
	case zk.StateConnected:
		return "zookeeper connected"
	case zk.StateHasSession:
		return "zookeeper has session"
	case zk.StateUnknown:
		return "zookeeper unknown state"
	case zk.State(zk.EventNodeDeleted):
		return "zookeeper node deleted"
	case zk.State(zk.EventNodeDataChanged):
		return "zookeeper node data changed"
	default:
		return state.String()
	}
}

// nolint
type Options struct {
	zkName string
	client *ZookeeperClient

	ts *zk.TestCluster
}

// Option will define a function of handling Options
type Option func(*Options)

// WithZkName sets zk client name
func WithZkName(name string) Option {
	return func(opt *Options) {
		opt.zkName = name
	}
}

// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
	var (
		err error
	)
	options := &Options{}
	for _, opt := range opts {
		opt(options)
	}
	connected := false

	lock := container.ZkClientLock()
	url := container.GetUrl()

	lock.Lock()
	defer lock.Unlock()

	if container.ZkClient() == nil {
		// in dubbo, every registry only connect one node, so this is []string{r.Address}
		timeout, paramErr := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
		if paramErr != nil {
			logger.Errorf("timeout config %v is invalid, err is %v",
				url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), paramErr.Error())
			return perrors.WithMessagef(paramErr, "newZookeeperClient(address:%+v)", url.Location)
		}
		zkAddresses := strings.Split(url.Location, ",")
		newClient, cltErr := NewZookeeperClient(options.zkName, zkAddresses, timeout)
		if cltErr != nil {
			logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
				options.zkName, url.Location, timeout.String(), cltErr)
			return perrors.WithMessagef(cltErr, "newZookeeperClient(address:%+v)", url.Location)
		}
		container.SetZkClient(newClient)
		connected = true
	}

	if container.ZkClient().Conn == nil {
		var event <-chan zk.Event
		container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
		if err == nil {
			container.ZkClient().Wait.Add(1)
			connected = true
			go container.ZkClient().HandleZkEvent(event)
		}
	}

	if connected {
		logger.Infof("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
		container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
	}

	return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}

// nolint
func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
	var (
		err   error
		event <-chan zk.Event
		z     *ZookeeperClient
	)

	z = &ZookeeperClient{
		name:          name,
		ZkAddrs:       zkAddrs,
		Timeout:       timeout,
		exit:          make(chan struct{}),
		eventRegistry: make(map[string][]*chan struct{}),
	}
	// connect to zookeeper
	z.Conn, event, err = zk.Connect(zkAddrs, timeout)
	if err != nil {
		return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
	}

	z.Wait.Add(1)
	go z.HandleZkEvent(event)

	return z, nil
}

// WithTestCluster sets test cluster for zk client
func WithTestCluster(ts *zk.TestCluster) Option {
	return func(opt *Options) {
		opt.ts = ts
	}
}

// NewMockZookeeperClient returns a mock client instance
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
	var (
		err   error
		event <-chan zk.Event
		z     *ZookeeperClient
		ts    *zk.TestCluster
	)

	z = &ZookeeperClient{
		name:          name,
		ZkAddrs:       []string{},
		Timeout:       timeout,
		exit:          make(chan struct{}),
		eventRegistry: make(map[string][]*chan struct{}),
	}

	options := &Options{}
	for _, opt := range opts {
		opt(options)
	}

	// connect to zookeeper
	if options.ts != nil {
		ts = options.ts
	} else {
		ts, err = zk.StartTestCluster(1, nil, nil)
		if err != nil {
			return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
		}
	}

	z.Conn, event, err = ts.ConnectWithOptions(timeout)
	if err != nil {
		return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
	}

	return ts, z, event, nil
}

// HandleZkEvent handles zookeeper events
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
	var (
		state int
		event zk.Event
	)

	defer func() {
		z.Wait.Done()
		logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
	}()

	for {
		select {
		case <-z.exit:
			return
		case event = <-session:
			logger.Infof("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
				z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
			switch (int)(event.State) {
			case (int)(zk.StateDisconnected):
				logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
				z.stop()
				z.Lock()
				conn := z.Conn
				z.Conn = nil
				z.Unlock()
				if conn != nil {
					conn.Close()
				}
				return
			case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
				logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
				z.eventRegistryLock.RLock()
				for p, a := range z.eventRegistry {
					if strings.HasPrefix(p, event.Path) {
						logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
							event.Path, p)
						for _, e := range a {
							*e <- struct{}{}
						}
					}
				}
				z.eventRegistryLock.RUnlock()
			case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
				if state == (int)(zk.StateHasSession) {
					continue
				}
				z.eventRegistryLock.RLock()
				if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
					for _, e := range a {
						*e <- struct{}{}
					}
				}
				z.eventRegistryLock.RUnlock()
			}
			state = (int)(event.State)
		}
	}
}

// RegisterEvent registers zookeeper events
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
	if zkPath == "" || event == nil {
		return
	}

	z.eventRegistryLock.Lock()
	defer z.eventRegistryLock.Unlock()
	a := z.eventRegistry[zkPath]
	a = append(a, event)
	z.eventRegistry[zkPath] = a
	logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
}

// UnregisterEvent unregisters zookeeper events
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
	if zkPath == "" {
		return
	}

	z.eventRegistryLock.Lock()
	defer z.eventRegistryLock.Unlock()
	infoList, ok := z.eventRegistry[zkPath]
	if !ok {
		return
	}
	for i, e := range infoList {
		if e == event {
			infoList = append(infoList[:i], infoList[i+1:]...)
			logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
		}
	}
	logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d",
		z.name, zkPath, event, len(infoList))
	if len(infoList) == 0 {
		delete(z.eventRegistry, zkPath)
	} else {
		z.eventRegistry[zkPath] = infoList
	}
}

// nolint
func (z *ZookeeperClient) Done() <-chan struct{} {
	return z.exit
}

func (z *ZookeeperClient) stop() bool {
	select {
	case <-z.exit:
		return true
	default:
		close(z.exit)
	}

	return false
}

// ZkConnValid validates zookeeper connection
func (z *ZookeeperClient) ZkConnValid() bool {
	select {
	case <-z.exit:
		return false
	default:
	}

	z.RLock()
	defer z.RUnlock()
	return z.Conn != nil
}

// nolint
func (z *ZookeeperClient) Close() {
	if z == nil {
		return
	}

	z.stop()
	z.Wait.Wait()
	z.Lock()
	conn := z.Conn
	z.Conn = nil
	z.Unlock()
	if conn != nil {
		logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID())
		conn.Close()
	}

	logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}

// Create will create the node recursively, which means that if the parent node is absent,
// it will create parent node first.
// And the value for the basePath is ""
func (z *ZookeeperClient) Create(basePath string) error {
	return z.CreateWithValue(basePath, []byte(""))
}

// CreateWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first.
func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
	var (
		err     error
		tmpPath string
	)

	logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
	conn := z.getConn()
	err = errNilZkClientConn
	if conn == nil {
		return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
	}

	for _, str := range strings.Split(basePath, "/")[1:] {
		tmpPath = path.Join(tmpPath, "/", str)
		_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))

		if err != nil {
			if err == zk.ErrNodeExists {
				logger.Debugf("zk.create(\"%s\") exists", tmpPath)
			} else {
				logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
				return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
			}
		}
	}

	return nil
}

// CreateTempWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first，and set value in last child path
// If the path exist, it will update data
func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) error {
	var (
		err     error
		tmpPath string
	)

	logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
	conn := z.getConn()
	err = errNilZkClientConn
	if conn == nil {
		return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
	}

	pathSlice := strings.Split(basePath, "/")[1:]
	length := len(pathSlice)
	for i, str := range pathSlice {
		tmpPath = path.Join(tmpPath, "/", str)
		// last child need be ephemeral
		if i == length-1 {
			_, err = conn.Create(tmpPath, value, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
			if err == zk.ErrNodeExists {
				return err
			}
		} else {
			_, err = conn.Create(tmpPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
		}
		if err != nil {
			if err == zk.ErrNodeExists {
				logger.Debugf("zk.create(\"%s\") exists", tmpPath)
			} else {
				logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
				return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
			}
		}
	}

	return nil
}

// nolint
func (z *ZookeeperClient) Delete(basePath string) error {
	err := errNilZkClientConn
	conn := z.getConn()
	if conn != nil {
		err = conn.Delete(basePath, -1)
	}

	return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}

// RegisterTemp registers temporary node by @basePath and @node
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
	var (
		err     error
		zkPath  string
		tmpPath string
	)

	err = errNilZkClientConn
	zkPath = path.Join(basePath) + "/" + node
	conn := z.getConn()
	if conn != nil {
		tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	}

	if err != nil {
		logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err))
		return zkPath, perrors.WithStack(err)
	}
	logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

	return tmpPath, nil
}

// RegisterTempSeq register temporary sequence node by @basePath and @data
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
	var (
		err     error
		tmpPath string
	)

	err = errNilZkClientConn
	conn := z.getConn()
	if conn != nil {
		tmpPath, err = conn.Create(
			path.Join(basePath)+"/",
			data,
			zk.FlagEphemeral|zk.FlagSequence,
			zk.WorldACL(zk.PermAll),
		)
	}

	logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
	if err != nil && err != zk.ErrNodeExists {
		logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)",
			z.name, basePath, string(data), err)
		return "", perrors.WithStack(err)
	}
	logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

	return tmpPath, nil
}

// GetChildrenW gets children watch by @path
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
	var (
		err      error
		children []string
		stat     *zk.Stat
		watcher  *zk.Watcher
	)

	err = errNilZkClientConn
	conn := z.getConn()
	if conn != nil {
		children, stat, watcher, err = conn.ChildrenW(path)
	}

	if err != nil {
		if err == zk.ErrNoChildrenForEphemerals {
			return nil, nil, errNilChildren
		}
		if err == zk.ErrNoNode {
			return nil, nil, errNilNode
		}
		logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
		return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
	}
	if stat == nil {
		return nil, nil, perrors.Errorf("path{%s} get stat is nil", path)
	}
	if len(children) == 0 {
		return nil, nil, errNilChildren
	}

	return children, watcher.EvtCh, nil
}

// GetChildren gets children by @path
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
	var (
		err      error
		children []string
		stat     *zk.Stat
	)

	err = errNilZkClientConn
	conn := z.getConn()
	if conn != nil {
		children, stat, err = conn.Children(path)
	}

	if err != nil {
		if err == zk.ErrNoNode {
			return nil, perrors.Errorf("path{%s} has none children", path)
		}
		logger.Errorf("zk.Children(path{%s}) = error(%v)", path, perrors.WithStack(err))
		return nil, perrors.WithMessagef(err, "zk.Children(path:%s)", path)
	}
	if stat == nil {
		return nil, perrors.Errorf("path{%s} has none children", path)
	}
	if len(children) == 0 {
		return nil, errNilChildren
	}

	return children, nil
}

// ExistW to judge watch whether it exists or not by @zkPath
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
	var (
		exist   bool
		err     error
		watcher *zk.Watcher
	)

	err = errNilZkClientConn
	conn := z.getConn()
	if conn != nil {
		exist, _, watcher, err = conn.ExistsW(zkPath)
	}

	if err != nil {
		logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err))
		return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath)
	}
	if !exist {
		logger.Warnf("zkClient{%s}'s App zk path{%s} does not exist.", z.name, zkPath)
		return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does not exist.", z.name, zkPath)
	}

	return watcher.EvtCh, nil
}

// GetContent gets content by @zkPath
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
	return z.Conn.Get(zkPath)
}

// nolint
func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int32) (*zk.Stat, error) {
	return z.Conn.Set(zkPath, content, version)
}

// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
	z.RLock()
	defer z.RUnlock()
	return z.Conn
}
