/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.omid.tso.client;

import com.google.common.base.Charsets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.zk.ZKUtils;
import org.apache.statemachine.StateMachine;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Describes the abstract methods to communicate to the TSO server
 */
public class TSOClient implements TSOProtocol, NodeCacheListener {

    private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);

    // Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
    public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";

    private static final long DEFAULT_EPOCH = -1L;
    private volatile long epoch = DEFAULT_EPOCH;

    // Attributes
    private CuratorFramework zkClient;
    private NodeCache currentTSOZNode;

    private ChannelFactory factory;
    private ClientBootstrap bootstrap;
    private Channel currentChannel;
    private final ScheduledExecutorService fsmExecutor;
    StateMachine.Fsm fsm;

    private final int requestTimeoutInMs;
    private final int requestMaxRetries;
    private final int tsoReconnectionDelayInSecs;
    private InetSocketAddress tsoAddr;
    private String zkCurrentTsoPath;

    private boolean lowLatency;

    // ----------------------------------------------------------------------------------------------------------------
    // Construction
    // ----------------------------------------------------------------------------------------------------------------

    public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
        return new TSOClient(tsoClientConf);
    }

    // Avoid instantiation
    private TSOClient(OmidClientConfiguration omidConf) throws IOException {

        // Start client with Nb of active threads = 3 as maximum.
        int tsoExecutorThreads = omidConf.getExecutorThreads();

        factory = new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(
                        new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
                Executors.newCachedThreadPool(
                        new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
        // Create the bootstrap
        bootstrap = new ClientBootstrap(factory);

        requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
        requestMaxRetries = omidConf.getRequestMaxRetries();
        tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();

        LOG.info("Connecting to TSO...");
        HostAndPort hp;
        switch (omidConf.getConnectionType()) {
            case HA:
                zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
                                                omidConf.getZkNamespace(),
                                                omidConf.getZkConnectionTimeoutInSecs());
                zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
                configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
                String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
                // TSO info includes the new TSO host:port address and epoch
                String[] currentTSOAndEpochArray = tsoInfo.split("#");
                hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
                setTSOAddress(hp.getHostText(), hp.getPort());
                epoch = Long.parseLong(currentTSOAndEpochArray[1]);
                LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
                break;
            case DIRECT:
            default:
                hp = HostAndPort.fromString(omidConf.getConnectionString());
                setTSOAddress(hp.getHostText(), hp.getPort());
                LOG.info("\t* TSO host:port {} will be connected directly", hp);
                break;
        }

        fsmExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
        fsm = new StateMachine.FsmImpl(fsmExecutor);
        fsm.setInitState(new DisconnectedState(fsm));

        ChannelPipeline pipeline = bootstrap.getPipeline();
        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
        pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
        pipeline.addLast("protobufencoder", new ProtobufEncoder());
        pipeline.addLast("handler", new Handler(fsm));

        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("connectTimeoutMillis", 100);
        lowLatency = false;
    }

    // ----------------------------------------------------------------------------------------------------------------
    // TSOProtocol interface
    // ----------------------------------------------------------------------------------------------------------------

    /**
     * @see TSOProtocol#getNewStartTimestamp()
     */
    @Override
    public TSOFuture<Long> getNewStartTimestamp() {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
        builder.setTimestampRequest(tsreqBuilder.build());
        RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
        fsm.sendEvent(request);
        return new ForwardingTSOFuture<>(request);
    }

    /**
     * @see TSOProtocol#commit(long, Set)
     */
    @Override
    public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
        commitbuilder.setStartTimestamp(transactionId);
        for (CellId cell : cells) {
            commitbuilder.addCellId(cell.getCellId());
        }
        builder.setCommitRequest(commitbuilder.build());
        RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
        fsm.sendEvent(request);
        return new ForwardingTSOFuture<>(request);
    }

    /**
     * @see TSOProtocol#close()
     */
    @Override
    public TSOFuture<Void> close() {
        final CloseEvent closeEvent = new CloseEvent();
        fsm.sendEvent(closeEvent);
        closeEvent.addListener(new Runnable() {
            @Override
            public void run() {
                try {
                    closeEvent.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } finally {
                    fsmExecutor.shutdown();
                    if (currentTSOZNode != null) {
                        try {
                            currentTSOZNode.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (zkClient != null) {
                        zkClient.close();
                    }
                }

            }
        }, fsmExecutor);
        return new ForwardingTSOFuture<>(closeEvent);
    }

    // ----------------------------------------------------------------------------------------------------------------
    // High availability related interface
    // ----------------------------------------------------------------------------------------------------------------

    /**
     * @see TSOProtocol#getEpoch()
     */
    @Override
    public long getEpoch() {
        return epoch;
    }

    // ----------------------------------------------------------------------------------------------------------------
    // NodeCacheListener interface
    // ----------------------------------------------------------------------------------------------------------------

    @Override
    public void nodeChanged() throws Exception {

        String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
        // TSO info includes the new TSO host:port address and epoch
        String[] currentTSOAndEpochArray = tsoInfo.split("#");
        HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
        setTSOAddress(hp.getHostText(), hp.getPort());
        epoch = Long.parseLong(currentTSOAndEpochArray[1]);
        LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
        if (currentChannel != null && currentChannel.isConnected()) {
            LOG.info("\tClosing channel with previous TSO {}", currentChannel);
            currentChannel.close();
        }

    }

    @Override
    public boolean isLowLatency() {
        return lowLatency;
    }

    // ****************************************** Finite State Machine ************************************************

    // ----------------------------------------------------------------------------------------------------------------
    // FSM: Events
    // ----------------------------------------------------------------------------------------------------------------

    private static class ParamEvent<T> implements StateMachine.Event {

        final T param;

        ParamEvent(T param) {
            this.param = param;
        }

        T getParam() {
            return param;
        }
    }

    private static class ErrorEvent extends ParamEvent<Throwable> {

        ErrorEvent(Throwable t) {
            super(t);
        }
    }

    private static class ConnectedEvent extends ParamEvent<Channel> {

        ConnectedEvent(Channel c) {
            super(c);
        }
    }

    private static class UserEvent<T> extends AbstractFuture<T>
            implements StateMachine.DeferrableEvent {

        void success(T value) {
            set(value);
        }

        @Override
        public void error(Throwable t) {
            setException(t);
        }
    }

    private static class CloseEvent extends UserEvent<Void> {

    }

    private static class ChannelClosedEvent extends ParamEvent<Throwable> {

        ChannelClosedEvent(Throwable t) {
            super(t);
        }
    }

    private static class ReconnectEvent implements StateMachine.Event {

    }

    private static class HandshakeTimeoutEvent implements StateMachine.Event {

    }

    private static class TimestampRequestTimeoutEvent implements StateMachine.Event {

    }

    private static class CommitRequestTimeoutEvent implements StateMachine.Event {

        final long startTimestamp;

        CommitRequestTimeoutEvent(long startTimestamp) {
            this.startTimestamp = startTimestamp;
        }

        public long getStartTimestamp() {
            return startTimestamp;
        }
    }

    private static class RequestEvent extends UserEvent<Long> {

        TSOProto.Request req;
        int retriesLeft;

        RequestEvent(TSOProto.Request req, int retriesLeft) {
            this.req = req;
            this.retriesLeft = retriesLeft;
        }

        TSOProto.Request getRequest() {
            return req;
        }

        void setRequest(TSOProto.Request request) {
            this.req = request;
        }

        int getRetriesLeft() {
            return retriesLeft;
        }

        void decrementRetries() {
            retriesLeft--;
        }

    }

    private static class ResponseEvent extends ParamEvent<TSOProto.Response> {

        ResponseEvent(TSOProto.Response r) {
            super(r);
        }
    }

    // ----------------------------------------------------------------------------------------------------------------
    // FSM: States
    // ----------------------------------------------------------------------------------------------------------------

    class BaseState extends StateMachine.State {

        BaseState(StateMachine.Fsm fsm) {
            super(fsm);
        }

        public StateMachine.State handleEvent(StateMachine.Event e) {
            LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
            return this;
        }
    }

    class DisconnectedState extends BaseState {

        DisconnectedState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: DISCONNECTED");
        }

        public StateMachine.State handleEvent(RequestEvent e) {
            fsm.deferEvent(e);
            return tryToConnectToTSOServer();
        }

        public StateMachine.State handleEvent(CloseEvent e) {
            factory.releaseExternalResources();
            e.success(null);
            return this;
        }

        private StateMachine.State tryToConnectToTSOServer() {
            final InetSocketAddress tsoAddress = getAddress();
            LOG.info("Trying to connect to TSO [{}]", tsoAddress);
            ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        LOG.info("Connection to TSO [{}] established. Channel {}",
                                 tsoAddress, channelFuture.getChannel());
                    } else {
                        LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
                                  tsoAddress, channelFuture.getChannel());
                    }
                }
            });
            return new ConnectingState(fsm);
        }
    }

    private class ConnectingState extends BaseState {

        ConnectingState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: CONNECTING");
        }

        public StateMachine.State handleEvent(UserEvent e) {
            fsm.deferEvent(e);
            return this;
        }

        public StateMachine.State handleEvent(ConnectedEvent e) {
            return new HandshakingState(fsm, e.getParam());
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new ConnectionFailedState(fsm, e.getParam());
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            return new ConnectionFailedState(fsm, e.getParam());
        }

    }

    private static class RequestAndTimeout {

        final RequestEvent event;
        final Timeout timeout;

        RequestAndTimeout(RequestEvent event, Timeout timeout) {
            this.event = event;
            this.timeout = timeout;
        }

        RequestEvent getRequest() {
            return event;
        }

        Timeout getTimeout() {
            return timeout;
        }

        public String toString() {
            String info = "Request type ";
            if (event.getRequest().hasTimestampRequest()) {
                info += "[Timestamp]";
            } else if (event.getRequest().hasCommitRequest()) {
                info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
            } else {
                info += "NONE";
            }
            return info;
        }
    }

    private class HandshakingState extends BaseState {

        final Channel channel;

        final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
                new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
        final Timeout timeout;

        HandshakingState(StateMachine.Fsm fsm, Channel channel) {
            super(fsm);
            LOG.debug("NEW STATE: HANDSHAKING");
            this.channel = channel;
            TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
            // Add the required handshake capabilities when necessary
            handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
            channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
            timeout = newTimeout();
        }

        private Timeout newTimeout() {
            if (requestTimeoutInMs > 0) {
                return timeoutExecutor.newTimeout(new TimerTask() {
                    @Override
                    public void run(Timeout timeout) {
                        fsm.sendEvent(new HandshakeTimeoutEvent());
                    }
                }, 30, TimeUnit.SECONDS);
            } else {
                return null;
            }
        }

        public StateMachine.State handleEvent(UserEvent e) {
            fsm.deferEvent(e);
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
            if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
                if (timeout != null) {
                    timeout.cancel();
                }
                return new ConnectedState(fsm, channel, timeoutExecutor);
            } else {
                cleanupState();
                LOG.error("Client incompatible with server");
                return new HandshakeFailedState(fsm, new HandshakeFailedException());
            }
        }

        public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
            cleanupState();
            return new ClosingState(fsm);
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            cleanupState();
            Throwable exception = e.getParam();
            LOG.error("Error during handshake", exception);
            return new HandshakeFailedState(fsm, exception);
        }

        private void cleanupState() {
            timeoutExecutor.stop();
            channel.close();
            if (timeout != null) {
                timeout.cancel();
            }
        }

    }

    class ConnectionFailedState extends BaseState {

        final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
                new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());

        Throwable exception;

        ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
            super(fsm);
            LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
            this.exception = exception;
            reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) {
                    fsm.sendEvent(new ReconnectEvent());
                }
            }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
        }

        public StateMachine.State handleEvent(UserEvent e) {
            e.error(exception);
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new DisconnectedState(fsm);
        }

        public StateMachine.State handleEvent(ReconnectEvent e) {
            return new DisconnectedState(fsm);
        }

    }

    private class HandshakeFailedState extends ConnectionFailedState {

        HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
            super(fsm, exception);
            LOG.debug("STATE: HANDSHAKING FAILED");
        }

    }

    class ConnectedState extends BaseState {

        final Queue<RequestAndTimeout> timestampRequests;
        final Map<Long, RequestAndTimeout> commitRequests;
        final Channel channel;

        final HashedWheelTimer timeoutExecutor;

        ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
            super(fsm);
            LOG.debug("NEW STATE: CONNECTED");
            this.channel = channel;
            this.timeoutExecutor = timeoutExecutor;
            timestampRequests = new ArrayDeque<>();
            commitRequests = new HashMap<>();
        }

        private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
            if (requestTimeoutInMs > 0) {
                return timeoutExecutor.newTimeout(new TimerTask() {
                    @Override
                    public void run(Timeout timeout) {
                        fsm.sendEvent(timeoutEvent);
                    }
                }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
            } else {
                return null;
            }
        }

        private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
            TSOProto.Request req = request.getRequest();

            if (req.hasTimestampRequest()) {
                timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
            } else if (req.hasCommitRequest()) {
                TSOProto.CommitRequest commitReq = req.getCommitRequest();
                commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
                        request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
            } else {
                request.error(new IllegalArgumentException("Unknown request type"));
                return;
            }
            ChannelFuture f = channel.write(req);

            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        fsm.sendEvent(new ErrorEvent(future.getCause()));
                    }
                }
            });
        }

        private void handleResponse(ResponseEvent response) {
            TSOProto.Response resp = response.getParam();
            if (resp.hasTimestampResponse()) {
                if (timestampRequests.size() == 0) {
                    LOG.debug("Received timestamp response when no requests outstanding");
                    return;
                }
                RequestAndTimeout e = timestampRequests.remove();
                e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
                if (e.getTimeout() != null) {
                    e.getTimeout().cancel();
                }
            } else if (resp.hasCommitResponse()) {
                long startTimestamp = resp.getCommitResponse().getStartTimestamp();
                RequestAndTimeout e = commitRequests.remove(startTimestamp);
                if (e == null) {
                    LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
                    return;
                }
                if (e.getTimeout() != null) {
                    e.getTimeout().cancel();
                }
                if (resp.getCommitResponse().getAborted()) {
                    e.getRequest().error(new AbortException());
                } else {
                    e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
                }
            }
        }

        public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
            if (!timestampRequests.isEmpty()) {
                RequestAndTimeout r = timestampRequests.remove();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                queueRetryOrError(fsm, r.getRequest());
            }
            return this;
        }

        public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
            long startTimestamp = e.getStartTimestamp();
            if (commitRequests.containsKey(startTimestamp)) {
                RequestAndTimeout r = commitRequests.remove(startTimestamp);
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                queueRetryOrError(fsm, r.getRequest());
            }
            return this;
        }

        public StateMachine.State handleEvent(CloseEvent e) {
            LOG.debug("CONNECTED STATE: CloseEvent");
            timeoutExecutor.stop();
            closeChannelAndErrorRequests();
            fsm.deferEvent(e);
            return new ClosingState(fsm);
        }

        public StateMachine.State handleEvent(RequestEvent e) {
            sendRequest(fsm, e);
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            handleResponse(e);
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            LOG.debug("CONNECTED STATE: ErrorEvent");
            timeoutExecutor.stop();
            handleError(fsm);
            return new ClosingState(fsm);
        }

        private void handleError(StateMachine.Fsm fsm) {
            LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
            while (timestampRequests.size() > 0) {
                RequestAndTimeout r = timestampRequests.remove();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                queueRetryOrError(fsm, r.getRequest());
            }
            Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
            while (iter.hasNext()) {
                RequestAndTimeout r = iter.next().getValue();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                queueRetryOrError(fsm, r.getRequest());
                iter.remove();
            }
            channel.close();
        }

        private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
            if (e.getRetriesLeft() > 0) {
                e.decrementRetries();
                if (e.getRequest().hasCommitRequest()) {
                    TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
                    if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
                        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
                        TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
                        commitBuilder.mergeFrom(commitRequest);
                        commitBuilder.setIsRetry(true);
                        builder.setCommitRequest(commitBuilder.build());
                        e.setRequest(builder.build());
                    }
                }
                fsm.sendEvent(e);
            } else {
                e.error(
                        new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
            }
        }

        private void closeChannelAndErrorRequests() {
            channel.close();
            for (RequestAndTimeout r : timestampRequests) {
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                r.getRequest().error(new ClosingException());
            }
            for (RequestAndTimeout r : commitRequests.values()) {
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                r.getRequest().error(new ClosingException());
            }
        }
    }

    private class ClosingState extends BaseState {

        ClosingState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: CLOSING");
        }

        public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
            // Ignored. They will be retried or errored
            return this;
        }

        public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
            // Ignored. They will be retried or errored
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            // Ignored. They will be retried or errored
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            // Ignored. They will be retried or errored
            return this;
        }

        public StateMachine.State handleEvent(UserEvent e) {
            fsm.deferEvent(e);
            return this;
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new DisconnectedState(fsm);
        }

        public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
            return this;
        }

    }

    // ----------------------------------------------------------------------------------------------------------------
    // Helper classes & methods
    // ----------------------------------------------------------------------------------------------------------------

    private class Handler extends SimpleChannelHandler {

        private StateMachine.Fsm fsm;

        Handler(StateMachine.Fsm fsm) {
            this.fsm = fsm;
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
            currentChannel = e.getChannel();
            LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
            fsm.sendEvent(new ConnectedEvent(e.getChannel()));
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
            fsm.sendEvent(new ErrorEvent(new ConnectionException()));
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
            fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            if (e.getMessage() instanceof TSOProto.Response) {
                fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
            } else {
                LOG.warn("Received unknown message", e.getMessage());
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
            fsm.sendEvent(new ErrorEvent(e.getCause()));
        }
    }

    private synchronized void setTSOAddress(String host, int port) {
        tsoAddr = new InetSocketAddress(host, port);
    }

    private synchronized InetSocketAddress getAddress() {
        return tsoAddr;
    }

    private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
        try {
            currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
            currentTSOZNode.getListenable().addListener(this);
            currentTSOZNode.start(true);
        } catch (Exception e) {
            throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
        }
    }

    private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
        ChildData currentTSOData = currentTSOZNode.getCurrentData();
        if (currentTSOData == null) {
            throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
        }
        byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
        if (currentTSOAndEpochAsBytes == null) {
            throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
        }
        return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
    }

}
