/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.stream.twitter;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
import com.twitter.hbc.core.endpoint.SitestreamEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

/**
 * Streamer that consumes from a Twitter Streaming API and feeds transformed key-value pairs,
 * by default <tweetId, text>, into an {@link IgniteDataStreamer} instance.
 * <p>
 * This streamer uses https://dev.twitter.com/streaming API and supports Public API, User Streams,
 * Site Streams and Firehose.
 * <p>
 * This Streamer features:
 * <ul>
 *     <li>Supports OAuth1 authentication scheme.
 *     <br/> BasicAuth not supported by Streaming API https://dev.twitter.com/streaming/overview/connecting</li>
 *     <li>Provide all params in apiParams map. https://dev.twitter.com/streaming/overview/request-parameters</li>
 * </ul>
 */
public class TwitterStreamer<K, V> extends StreamAdapter<String, K, V> {
    /** Logger. */
    protected IgniteLogger log;

    /** Threads count used to transform tweets. */
    private int threadsCount = 1;

    /** Twitter Streaming API params. See https://dev.twitter.com/streaming/overview/request-parameters */
    private Map<String, String> apiParams;

    /** Twitter streaming API endpoint example, '/statuses/filter.json' or '/statuses/firehose.json' */
    private String endpointUrl;

    /** OAuth params holder */
    private OAuthSettings oAuthSettings;

    /** shared variable to communicate/signal that streamer is already running or can be started */
    private final AtomicInteger running = new AtomicInteger();

    /**
     * Size of buffer for streaming, as for some tracking terms traffic can be low and for others high, this is
     * configurable
     */
    private Integer bufferCapacity = 100000;

    /** Twitter streaming client (Twitter HBC) to interact with stream */
    private Client client;

    /** Process stream asynchronously */
    private ExecutorService tweetStreamProcessor;

    /** Param key name constant for Site streaming */
    private final String SITE_USER_ID_KEY = "follow";

    /**
     * @param oAuthSettings OAuth Settings
     */
    public TwitterStreamer(OAuthSettings oAuthSettings) {
        this.oAuthSettings = oAuthSettings;
    }

    /**
     * Starts streamer.
     */
    public void start() {
        if (!running.compareAndSet(0, 1))
            throw new IgniteException("Attempted to start an already started Twitter Streamer");

        validateConfig();

        log = getIgnite().log();

        final BlockingQueue<String> tweetQueue = new LinkedBlockingQueue<>(bufferCapacity);

        client = getClient(tweetQueue);

        client.connect();

        tweetStreamProcessor = Executors.newFixedThreadPool(threadsCount);

        for (int i = 0; i < threadsCount; i++) {
            Callable<Boolean> task = new Callable<Boolean>() {

                @Override public Boolean call() {
                    while (true) {
                        try {
                            String tweet = tweetQueue.take();

                            addMessage(tweet);
                        }
                        catch (InterruptedException e) {
                            U.warn(log, "Tweets transformation was interrupted", e);

                            return true;
                        }
                    }
                }
            };

            tweetStreamProcessor.submit(task);
        }
    }

    /**
     * Stops streamer.
     */
    public void stop() {
        if (running.get() == 0)
            throw new IgniteException("Attempted to stop an already stopped Twitter Streamer");

        tweetStreamProcessor.shutdownNow();

        client.stop();

        running.compareAndSet(1, 0);
    }

    /**
     * Validates config at start.
     */
    protected void validateConfig() {
        A.notNull(getStreamer(), "Streamer");
        A.notNull(getIgnite(), "Ignite");
        A.notNull(endpointUrl, "Twitter Streaming API endpoint");

        A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null, "Twitter extractor");

        String followParam = apiParams.get(SITE_USER_ID_KEY);

        A.ensure(followParam != null && followParam.matches("^(\\d+,? ?)+$"),
            "Site streaming endpoint must provide 'follow' param with value as comma separated numbers");
    }

    /**
     * @param tweetQueue Tweet queue.
     * @return Client.
     */
    protected Client getClient(BlockingQueue<String> tweetQueue) {
        StreamingEndpoint endpoint;

        HttpHosts hosts;

        switch (endpointUrl.toLowerCase()) {
            case StatusesFilterEndpoint.PATH:
                endpoint = new StatusesFilterEndpoint();

                hosts = HttpHosts.STREAM_HOST;

                break;
            case StatusesFirehoseEndpoint.PATH:
                endpoint = new StatusesFirehoseEndpoint();

                hosts = HttpHosts.STREAM_HOST;

                break;
            case StatusesSampleEndpoint.PATH:
                endpoint = new StatusesSampleEndpoint();

                hosts = HttpHosts.STREAM_HOST;

                break;
            case UserstreamEndpoint.PATH:
                endpoint = new UserstreamEndpoint();

                hosts = HttpHosts.USERSTREAM_HOST;

                break;
            case SitestreamEndpoint.PATH:
                String follow = apiParams.remove(SITE_USER_ID_KEY);

                List<Long> followers = Lists.newArrayList();

                for (String follower : Splitter.on(',').trimResults().omitEmptyStrings().split(follow))
                    followers.add(Long.valueOf(follower));

                endpoint = new SitestreamEndpoint(followers);

                hosts = HttpHosts.SITESTREAM_HOST;

                break;
            default:
                endpoint = new DefaultStreamingEndpoint(endpointUrl, HttpConstants.HTTP_GET, false);

                hosts = HttpHosts.STREAM_HOST;

        }

        for (Map.Entry<String, String> entry : apiParams.entrySet()) {
            endpoint.addPostParameter(entry.getKey(), entry.getValue());
        }

        return buildClient(tweetQueue, hosts, endpoint);
    }

    /**
     * @param tweetQueue tweet Queue.
     * @param hosts Hostes.
     * @param endpoint Endpoint.
     * @return Client.
     */
    protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts hosts, StreamingEndpoint endpoint) {
        Authentication authentication = new OAuth1(oAuthSettings.getConsumerKey(), oAuthSettings.getConsumerSecret(),
            oAuthSettings.getAccessToken(), oAuthSettings.getAccessTokenSecret());

        ClientBuilder builder = new ClientBuilder()
            .name("Ignite-Twitter-Client")
            .hosts(hosts)
            .authentication(authentication)
            .endpoint(endpoint)
            .processor(new StringDelimitedProcessor(tweetQueue));

        return builder.build();
    }

    /**
     * Sets API Params.
     *
     * @param apiParams API Params.
     */
    public void setApiParams(Map<String, String> apiParams) {
        this.apiParams = apiParams;
    }

    /**
     * Sets Endpoint URL.
     *
     * @param endpointUrl Endpoint URL.
     */
    public void setEndpointUrl(String endpointUrl) {
        this.endpointUrl = endpointUrl;
    }

    /**
     * Sets Buffer capacity.
     *
     * @param bufferCapacity Buffer capacity.
     */
    public void setBufferCapacity(Integer bufferCapacity) {
        this.bufferCapacity = bufferCapacity;
    }

    /**
     * Sets Threads count.
     *
     * @param threadsCount Threads count.
     */
    public void setThreadsCount(int threadsCount) {
        this.threadsCount = threadsCount;
    }
}
