/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.s4.comm.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;

/**
 * <p>
 * Monitors streams available in the S4 cluster.
 * </p>
 * <p>
 * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
 * </p>
 * <p>
 * Provides methods to publish producers and consumers of streams
 * </p>
 * 
 */
@Singleton
public class RemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreamsManager {

    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
    private KeeperState state;
    private final ZkClient zkClient;
    private final Lock lock;
    private final static String STREAMS_PATH = "/s4/streams";
    // by stream name, then "producer"|"consumer" then
    private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();

    public enum StreamType {
        PRODUCER, CONSUMER;

        public String getPath(String streamName) {
            switch (this) {
                case PRODUCER:
                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
                case CONSUMER:
                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
                default:
                    throw new RuntimeException("Invalid path in enum StreamType");
            }
        }

        public String getCollectionName() {
            switch (this) {
                case PRODUCER:
                    return "producers";
                case CONSUMER:
                    return "consumers";
                default:
                    throw new RuntimeException("Invalid path in enum StreamType");
            }
        }
    }

    @Inject
    public RemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {

        lock = new ReentrantLock();
        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
        ZkSerializer serializer = new ZNRecordSerializer();
        zkClient.setZkSerializer(serializer);
        zkClient.subscribeStateChanges(this);
        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
        // bug in zkClient, it does not invoke handleNewSession the first time
        // it connects
        this.handleStateChanged(KeeperState.SyncConnected);

        this.handleNewSession();

    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.s4.comm.topology.RemoteStreamsManager#getConsumers(java.lang.String)
     */
    @Override
    public Set<StreamConsumer> getConsumers(String streamName) {
        if (!streams.containsKey(streamName)) {
            return Collections.emptySet();
        } else {
            return streams.get(streamName).get("consumers");
        }
    }

    /**
     * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
     */
    private void doProcess() {
        lock.lock();
        try {
            refreshStreams();
        } catch (Exception e) {
            logger.warn("Exception in tryToAcquireTask", e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void handleStateChanged(KeeperState state) throws Exception {
        this.state = state;
        if (state.equals(KeeperState.Expired)) {
            logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
            System.exit(1);
        }

    }

    @Override
    public void handleNewSession() throws Exception {
        logger.info("New session:" + zkClient.getSessionId());
        zkClient.subscribeChildChanges(STREAMS_PATH, this);

        doProcess();
    }

    @Override
    public void handleChildChange(String paramString, List<String> paramList) throws Exception {
        doProcess();
    }

    private void refreshStreams() {
        List<String> children = zkClient.getChildren(STREAMS_PATH);
        for (String streamName : children) {
            if (!streams.containsKey(streamName)) {
                logger.info("Detected new stream [{}]", streamName);
                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
            }

            update(streamName, StreamType.PRODUCER);
            update(streamName, StreamType.CONSUMER);
        }
    }

    private void update(String streamName, StreamType type) {
        List<String> elements = zkClient.getChildren(type.getPath(streamName));
        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
        for (String element : elements) {
            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
            if (producerData != null) {
                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
                        producerData.getSimpleField("clusterName"));
                consumers.add(consumer);
            }
        }
        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String,
     * java.lang.String)
     */
    @Override
    public void addOutputStream(String appId, String clusterName, String streamName) {
        lock.lock();
        try {
            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
                    clusterName });
            createStreamPaths(streamName);
            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
            producer.putSimpleField("appId", appId);
            producer.putSimpleField("clusterName", clusterName);
            try {
                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
            } catch (Throwable e) {
                logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
                        new String[] { streamName, appId, clusterName, e.getMessage() });
            }
            refreshStreams();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
     */
    private void createStreamPaths(String streamName) {
        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addInputStream(int, java.lang.String, java.lang.String)
     */
    @Override
    public void addInputStream(int appId, String clusterName, String streamName) {
        lock.lock();
        try {
            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
                    new String[] { streamName, String.valueOf(appId), clusterName });
            createStreamPaths(streamName);
            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
            consumer.putSimpleField("appId", String.valueOf(appId));
            consumer.putSimpleField("clusterName", clusterName);
            try {
                // NOTE: We create 1 sequential znode per consumer node instance
                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
            } catch (Throwable e) {
                logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
            }
            refreshStreams();
        } finally {
            lock.unlock();
        }
    }
}
