/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.partition;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.linkedin.util.clock.Clock;
import org.linkedin.util.clock.SystemClock;
import org.linkedin.util.clock.Timespan;
import org.linkedin.util.concurrent.ConcurrentUtils;
import org.linkedin.util.io.PathUtils;
import org.linkedin.zookeeper.client.*;
import org.slf4j.Logger;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class ZKClient  extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {

    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);

    private Map<String, String> acls;
    private String password;

    public void start() throws Exception {
        // Grab the lock to make sure that the registration of the ManagedService
        // won't be updated immediately but that the initial update will happen first
        synchronized (_lock) {
            _stateChangeDispatcher.setDaemon(true);
            _stateChangeDispatcher.start();
            doStart();
        }
    }

    public void setACLs(Map<String, String> acls) {
        this.acls = acls;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    protected void doStart() throws UnsupportedEncodingException {
        connect();
    }

    @Override
    public void close() {
        if (_stateChangeDispatcher != null) {
            _stateChangeDispatcher.end();
            try {
                _stateChangeDispatcher.join(1000);
            } catch (Exception e) {
                LOG.debug("ignored exception", e);
            }
        }
        synchronized(_lock) {
            if (_zk != null) {
                try {
                    changeState(State.NONE);
                    _zk.close();
                    Thread th = getSendThread();
                    if (th != null) {
                        th.join(1000);
                    }
                    _zk = null;
                } catch (Exception e) {
                    LOG.debug("ignored exception", e);
                }
            }
        }
    }

    protected Thread getSendThread() {
        try {
            return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
        } catch (Throwable e) {
            return null;
        }
    }

    protected Object getField(Object obj, String... names) throws Exception {
        for (String name : names) {
            obj = getField(obj, name);
        }
        return obj;
    }

    protected Object getField(Object obj, String name) throws Exception {
        Class clazz = obj.getClass();
        while (clazz != null) {
            for (Field f : clazz.getDeclaredFields()) {
                if (f.getName().equals(name)) {
                    f.setAccessible(true);
                    return f.get(obj);
                }
            }
        }
        throw new NoSuchFieldError(name);
    }

    protected void changeState(State newState) {
        synchronized (_lock) {
            State oldState = _state;
            if (oldState != newState) {
                _stateChangeDispatcher.addEvent(oldState, newState);
                _state = newState;
                _lock.notifyAll();
            }
        }
    }

    public void testGenerateConnectionLoss() throws Exception {
        waitForConnected();
        Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
        callMethod(clientCnxnSocket, "testableCloseSocket");
    }

    protected Object callMethod(Object obj, String name, Object... args) throws Exception {
        Class clazz = obj.getClass();
        while (clazz != null) {
            for (Method m : clazz.getDeclaredMethods()) {
                if (m.getName().equals(name)) {
                    m.setAccessible(true);
                    return m.invoke(obj, args);
                }
            }
        }
        throw new NoSuchMethodError(name);
    }

    protected void tryConnect() {
        synchronized (_lock) {
            try {
                connect();
            } catch (Throwable e) {
                LOG.warn("Error while restarting:", e);
                if (_expiredSessionRecovery == null) {
                    _expiredSessionRecovery = new ExpiredSessionRecovery();
                    _expiredSessionRecovery.setDaemon(true);
                    _expiredSessionRecovery.start();
                }
            }
        }
    }

    public void connect() throws UnsupportedEncodingException {
        synchronized (_lock) {
            changeState(State.CONNECTING);
            _zk = _factory.createZooKeeper(this);
            if (password != null) {
                _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
            }
        }
    }

    public void process(WatchedEvent event) {
        if (event.getState() != null) {
            LOG.debug("event: {}", event.getState());
            synchronized (_lock) {
                switch(event.getState()) {
                    case SyncConnected:
                        changeState(State.CONNECTED);
                        break;
                    case Disconnected:
                        if (_state != State.NONE) {
                            changeState(State.RECONNECTING);
                        }
                        break;
                    case Expired:
                        // when expired, the zookeeper object is invalid and we need to recreate a new one
                        _zk = null;
                        LOG.warn("Expiration detected: trying to restart...");
                        tryConnect();
                        break;
                    default:
                        LOG.warn("Unsupported event state: {}", event.getState());
                }
            }
        }
    }

    @Override
    protected IZooKeeper getZk() {
        State state = _state;
        if (state == State.NONE) {
            throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
        } else if (state != State.CONNECTING) {
            try {
                waitForConnected();
            } catch (Exception e) {
                throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
            }
        }
        IZooKeeper zk = _zk;
        if (zk == null) {
            throw new IllegalStateException("No ZooKeeper connection available");
        }
        return zk;
    }

    public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
        waitForState(State.CONNECTED, timeout);
    }

    public void waitForConnected() throws InterruptedException, TimeoutException {
        waitForConnected(null);
    }

    public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
        long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
        if (_state != state) {
            synchronized (_lock) {
                while (_state != state) {
                    ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
                }
            }
        }
    }

    @Override
    public void registerListener(LifecycleListener listener) {
        if (listener == null) {
            throw new IllegalStateException("listener is null");
        }
        if (!_listeners.contains(listener)) {
            _listeners.add(listener);
        }
        if (_state == State.CONNECTED) {
            listener.onConnected();
            //_stateChangeDispatcher.addEvent(null, State.CONNECTED);
        }
    }

    @Override
    public void removeListener(LifecycleListener listener) {
        if (listener == null) {
            throw new IllegalStateException("listener is null");
        }
        _listeners.remove(listener);
    }

    @Override
    public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
        return new ChrootedZKClient(this, adjustPath(path));
    }

    @Override
    public boolean isConnected() {
        return _state == State.CONNECTED;
    }

    public boolean isConfigured() {
        return _state != State.NONE;
    }

    @Override
    public String getConnectString() {
        return _factory.getConnectString();
    }

    public static enum State {
        NONE,
        CONNECTING,
        CONNECTED,
        RECONNECTING
    }

    private final static String CHARSET = "UTF-8";

    private final Clock _clock = SystemClock.instance();
    private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();

    protected final Object _lock = new Object();
    protected volatile State _state = State.NONE;

    private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();

    protected IZooKeeperFactory _factory;
    protected IZooKeeper _zk;
    protected Timespan _reconnectTimeout = Timespan.parse("20s");
    protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);

    private ExpiredSessionRecovery _expiredSessionRecovery = null;

    private class StateChangeDispatcher extends Thread {
        private final AtomicBoolean _running = new AtomicBoolean(true);
        private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();

        private StateChangeDispatcher() {
            super("ZooKeeper state change dispatcher thread");
        }

        @Override
        public void run() {
            Map<Object, Boolean> history = new IdentityHashMap<>();
            LOG.info("Starting StateChangeDispatcher");
            while (_running.get()) {
                Boolean isConnectedEvent;
                try {
                    isConnectedEvent = _events.take();
                } catch (InterruptedException e) {
                    continue;
                }
                if (!_running.get() || isConnectedEvent == null) {
                    continue;
                }
                Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
                // we save which event each listener has seen last
                // we don't update the map in place because we need to get rid of unregistered listeners
                history = newHistory;
            }
            LOG.info("StateChangeDispatcher terminated.");
        }

        public void end() {
            _running.set(false);
            _events.add(false);
        }

        public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
            LOG.debug("addEvent: {} => {}", oldState, newState);
            if (newState == ZKClient.State.CONNECTED) {
                _events.add(true);
            } else if (oldState == ZKClient.State.CONNECTED) {
                _events.add(false);
            }
        }
    }

    protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
        Map<Object, Boolean> newHistory = new IdentityHashMap<>();
        for (LifecycleListener listener : _listeners) {
            Boolean previousEvent = history.get(listener);
            // we propagate the event only if it was not already sent
            if (previousEvent == null || previousEvent != connectedEvent) {
                try {
                    if (connectedEvent) {
                        listener.onConnected();
                    } else {
                        listener.onDisconnected();
                    }
                } catch (Throwable e) {
                    LOG.warn("Exception while executing listener (ignored)", e);
                }
            }
            newHistory.put(listener, connectedEvent);
        }
        return newHistory;
    }

    private class ExpiredSessionRecovery extends Thread {

        private ExpiredSessionRecovery() {
            super("ZooKeeper expired session recovery thread");
        }

        @Override
        public void run() {
            LOG.info("Entering recovery mode");
            synchronized (_lock) {
                try {
                    int count = 0;
                    while (_state == ZKClient.State.NONE) {
                        try {
                            count++;
                            LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
                            ZKClient.this.connect();
                        } catch (Throwable e) {
                            LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
                            try {
                                _lock.wait(_reconnectTimeout.getDurationInMilliseconds());
                            } catch (InterruptedException e1) {
                                throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
                            }
                        }
                    }
                } finally {
                    _expiredSessionRecovery = null;
                    LOG.info("Exiting recovery mode.");
                }
            }
        }

    }

    public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
        this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
    }

    public ZKClient(IZooKeeperFactory factory) {
        this(factory, null);
    }

    public ZKClient(IZooKeeperFactory factory, String chroot) {
        super(chroot);
        _factory = factory;
        Map<String, String> acls = new HashMap<>();
        acls.put("/", "world:anyone:acdrw");
        setACLs(acls);
    }

    static private int getPermFromString(String permString) {
        int perm = 0;
        for (int i = 0; i < permString.length(); i++) {
            switch (permString.charAt(i)) {
                case 'r':
                    perm |= ZooDefs.Perms.READ;
                    break;
                case 'w':
                    perm |= ZooDefs.Perms.WRITE;
                    break;
                case 'c':
                    perm |= ZooDefs.Perms.CREATE;
                    break;
                case 'd':
                    perm |= ZooDefs.Perms.DELETE;
                    break;
                case 'a':
                    perm |= ZooDefs.Perms.ADMIN;
                    break;
                default:
                    System.err.println("Unknown perm type:" + permString.charAt(i));
            }
        }
        return perm;
    }

    private static List<ACL> parseACLs(String aclString) {
        List<ACL>  acl;
        String acls[] = aclString.split(",");
        acl = new ArrayList<>();
        for (String a : acls) {
            int firstColon = a.indexOf(':');
            int lastColon = a.lastIndexOf(':');
            if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
                System.err.println(a + " does not have the form scheme:id:perm");
                continue;
            }
            ACL newAcl = new ACL();
            newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
            newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
            acl.add(newAcl);
        }
        return acl;
    }

    public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
        if (exists(path) != null) {
            return setByteData(path, data);
        }
        try {
            createBytesNodeWithParents(path, data, acl, createMode);
            return null;
        } catch (KeeperException.NodeExistsException e) {
            // this should not happen very often (race condition)
            return setByteData(path, data);
        }
    }

    public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
        return create(path, (byte[]) null, createMode);
    }

    public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
        return create(path, toByteData(data), createMode);
    }

    public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
        return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
    }

    public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
        return createWithParents(path, (byte[]) null, createMode);
    }

    public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
        return createWithParents(path, toByteData(data), createMode);
    }

    public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
        createParents(path);
        return create(path, data, createMode);
    }

    public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
        return createOrSetWithParents(path, toByteData(data), createMode);
    }

    public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
        if (exists(path) != null) {
            return setByteData(path, data);
        }
        try {
            createWithParents(path, data, createMode);
            return null;
        } catch (KeeperException.NodeExistsException e) {
            // this should not happen very often (race condition)
            return setByteData(path, data);
        }
    }

    public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
        if (exists(path) != null) {
            doFixACLs(path, recursive);
        }
    }

    private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
        setACL(path, getNodeACLs(path), -1);
        if (recursive) {
            for (String child : getChildren(path)) {
                doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
            }
        }
    }

    private List<ACL> getNodeACLs(String path) {
        String acl = doGetNodeACLs(adjustPath(path));
        if (acl == null) {
            throw new IllegalStateException("Could not find matching ACLs for " + path);
        }
        return parseACLs(acl);
    }

    protected String doGetNodeACLs(String path) {
        String longestPath = "";
        for (String acl : acls.keySet()) {
            if (acl.length() > longestPath.length() && path.startsWith(acl)) {
                longestPath = acl;
            }
        }
        return acls.get(longestPath);
    }

    private void createParents(String path) throws InterruptedException, KeeperException {
        path = PathUtils.getParentPath(adjustPath(path));
        path = PathUtils.removeTrailingSlash(path);
        List<String> paths = new ArrayList<>();
        while (!path.equals("") && getZk().exists(path, false) == null) {
            paths.add(path);
            path = PathUtils.getParentPath(path);
            path = PathUtils.removeTrailingSlash(path);
        }
        Collections.reverse(paths);
        for (String p : paths) {
            try {
                getZk().create(p,
                        null,
                        getNodeACLs(p),
                        CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // ok we continue...
                if (LOG.isDebugEnabled()) {
                    LOG.debug("parent already exists " + p);
                }
            }
        }
    }

    private byte[] toByteData(String data) {
        if (data == null) {
            return null;
        } else {
            try {
                return data.getBytes(CHARSET);
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }

}
