blob: 77ce2ef5d47093a360a2c87d8c4f746a73db2c72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.twitter;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
import com.twitter.hbc.core.endpoint.SitestreamEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;
/**
* Streamer that consumes from a Twitter Streaming API and feeds transformed key-value pairs,
* by default <tweetId, text>, into an {@link IgniteDataStreamer} instance.
* <p>
* This streamer uses https://dev.twitter.com/streaming API and supports Public API, User Streams,
* Site Streams and Firehose.
* <p>
* This Streamer features:
* <ul>
* <li>Supports OAuth1 authentication scheme.
* <br/> BasicAuth not supported by Streaming API https://dev.twitter.com/streaming/overview/connecting</li>
* <li>Provide all params in apiParams map. https://dev.twitter.com/streaming/overview/request-parameters</li>
* </ul>
*/
public class TwitterStreamer<K, V> extends StreamAdapter<String, K, V> {
/** Logger. */
protected IgniteLogger log;
/** Threads count used to transform tweets. */
private int threadsCount = 1;
/** Twitter Streaming API params. See https://dev.twitter.com/streaming/overview/request-parameters */
private Map<String, String> apiParams;
/** Twitter streaming API endpoint example, '/statuses/filter.json' or '/statuses/firehose.json' */
private String endpointUrl;
/** OAuth params holder */
private OAuthSettings oAuthSettings;
/** shared variable to communicate/signal that streamer is already running or can be started */
private final AtomicInteger running = new AtomicInteger();
/**
* Size of buffer for streaming, as for some tracking terms traffic can be low and for others high, this is
* configurable
*/
private Integer bufferCapacity = 100000;
/** Twitter streaming client (Twitter HBC) to interact with stream */
private Client client;
/** Process stream asynchronously */
private ExecutorService tweetStreamProcessor;
/** Param key name constant for Site streaming */
private final String SITE_USER_ID_KEY = "follow";
/**
* @param oAuthSettings OAuth Settings
*/
public TwitterStreamer(OAuthSettings oAuthSettings) {
this.oAuthSettings = oAuthSettings;
}
/**
* Starts streamer.
*/
public void start() {
if (!running.compareAndSet(0, 1))
throw new IgniteException("Attempted to start an already started Twitter Streamer");
validateConfig();
log = getIgnite().log();
final BlockingQueue<String> tweetQueue = new LinkedBlockingQueue<>(bufferCapacity);
client = getClient(tweetQueue);
client.connect();
tweetStreamProcessor = Executors.newFixedThreadPool(threadsCount);
for (int i = 0; i < threadsCount; i++) {
Callable<Boolean> task = new Callable<Boolean>() {
@Override public Boolean call() {
while (true) {
try {
String tweet = tweetQueue.take();
addMessage(tweet);
}
catch (InterruptedException e) {
U.warn(log, "Tweets transformation was interrupted", e);
return true;
}
}
}
};
tweetStreamProcessor.submit(task);
}
}
/**
* Stops streamer.
*/
public void stop() {
if (running.get() == 0)
throw new IgniteException("Attempted to stop an already stopped Twitter Streamer");
tweetStreamProcessor.shutdownNow();
client.stop();
running.compareAndSet(1, 0);
}
/**
* Validates config at start.
*/
protected void validateConfig() {
A.notNull(getStreamer(), "Streamer");
A.notNull(getIgnite(), "Ignite");
A.notNull(endpointUrl, "Twitter Streaming API endpoint");
A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null, "Twitter extractor");
String followParam = apiParams.get(SITE_USER_ID_KEY);
A.ensure(followParam != null && followParam.matches("^(\\d+,? ?)+$"),
"Site streaming endpoint must provide 'follow' param with value as comma separated numbers");
}
/**
* @param tweetQueue Tweet queue.
* @return Client.
*/
protected Client getClient(BlockingQueue<String> tweetQueue) {
StreamingEndpoint endpoint;
HttpHosts hosts;
switch (endpointUrl.toLowerCase()) {
case StatusesFilterEndpoint.PATH:
endpoint = new StatusesFilterEndpoint();
hosts = HttpHosts.STREAM_HOST;
break;
case StatusesFirehoseEndpoint.PATH:
endpoint = new StatusesFirehoseEndpoint();
hosts = HttpHosts.STREAM_HOST;
break;
case StatusesSampleEndpoint.PATH:
endpoint = new StatusesSampleEndpoint();
hosts = HttpHosts.STREAM_HOST;
break;
case UserstreamEndpoint.PATH:
endpoint = new UserstreamEndpoint();
hosts = HttpHosts.USERSTREAM_HOST;
break;
case SitestreamEndpoint.PATH:
String follow = apiParams.remove(SITE_USER_ID_KEY);
List<Long> followers = Lists.newArrayList();
for (String follower : Splitter.on(',').trimResults().omitEmptyStrings().split(follow))
followers.add(Long.valueOf(follower));
endpoint = new SitestreamEndpoint(followers);
hosts = HttpHosts.SITESTREAM_HOST;
break;
default:
endpoint = new DefaultStreamingEndpoint(endpointUrl, HttpConstants.HTTP_GET, false);
hosts = HttpHosts.STREAM_HOST;
}
for (Map.Entry<String, String> entry : apiParams.entrySet()) {
endpoint.addPostParameter(entry.getKey(), entry.getValue());
}
return buildClient(tweetQueue, hosts, endpoint);
}
/**
* @param tweetQueue tweet Queue.
* @param hosts Hostes.
* @param endpoint Endpoint.
* @return Client.
*/
protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts hosts, StreamingEndpoint endpoint) {
Authentication authentication = new OAuth1(oAuthSettings.getConsumerKey(), oAuthSettings.getConsumerSecret(),
oAuthSettings.getAccessToken(), oAuthSettings.getAccessTokenSecret());
ClientBuilder builder = new ClientBuilder()
.name("Ignite-Twitter-Client")
.hosts(hosts)
.authentication(authentication)
.endpoint(endpoint)
.processor(new StringDelimitedProcessor(tweetQueue));
return builder.build();
}
/**
* Sets API Params.
*
* @param apiParams API Params.
*/
public void setApiParams(Map<String, String> apiParams) {
this.apiParams = apiParams;
}
/**
* Sets Endpoint URL.
*
* @param endpointUrl Endpoint URL.
*/
public void setEndpointUrl(String endpointUrl) {
this.endpointUrl = endpointUrl;
}
/**
* Sets Buffer capacity.
*
* @param bufferCapacity Buffer capacity.
*/
public void setBufferCapacity(Integer bufferCapacity) {
this.bufferCapacity = bufferCapacity;
}
/**
* Sets Threads count.
*
* @param threadsCount Threads count.
*/
public void setThreadsCount(int threadsCount) {
this.threadsCount = threadsCount;
}
}