| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.contrib.twitter; |
| |
| import java.util.concurrent.ArrayBlockingQueue; |
| |
| import javax.validation.constraints.Min; |
| import javax.validation.constraints.NotNull; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import twitter4j.*; |
| import twitter4j.conf.ConfigurationBuilder; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.InputOperator; |
| import com.datatorrent.api.Operator.ActivationListener; |
| |
| /** |
| * This is an input operator for Twitter. |
| * <p> |
| * This test can only be run from command line using command line interface script. |
| * You need to set twitter authentication credentials in $HOME/.dt/dt-site.xml file in order to run this. |
| * The authentication requires following 4 information. |
| * Your application consumer key, |
| * Your application consumer secret, |
| * Your twitter access token, and |
| * Your twitter access token secret. |
| * </p> |
| * @displayName Twitter Input |
| * @category Web |
| * @tags input operator |
| * @since 0.3.2 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class TwitterSampleInput implements InputOperator, ActivationListener<OperatorContext>, StatusListener |
| { |
| /** |
| * This is the output port on which the twitter status information is emitted. |
| */ |
| public final transient DefaultOutputPort<Status> status = new DefaultOutputPort<>(); |
| /** |
| * This is the output port on which the twitter text is emitted. |
| */ |
| public final transient DefaultOutputPort<String> text = new DefaultOutputPort<>(); |
| /** |
| * This is the output port on which the twitter url is emitted. |
| */ |
| public final transient DefaultOutputPort<String> url = new DefaultOutputPort<>(); |
| /** |
| * This is the output port on which the twitter hashtags are emitted. |
| */ |
| public final transient DefaultOutputPort<String> hashtag = new DefaultOutputPort<>(); |
| |
| /* the following 3 ports are not implemented so far */ |
| public final transient DefaultOutputPort<?> userMention = null; |
| public final transient DefaultOutputPort<?> media = null; |
| /** |
| * Enable debugging. |
| */ |
| private boolean debug; |
| /** |
| * For tapping into the tweets. |
| */ |
| private transient Thread operatorThread; |
| private transient TwitterStream ts; |
| private transient ArrayBlockingQueue<Status> statuses = new ArrayBlockingQueue<>(1024 * 1024); |
| protected transient int count; |
| /** |
| * The state which we would like to save for this operator. |
| */ |
| private int feedMultiplier = 1; |
| @Min(0) |
| private int feedMultiplierVariance = 0; |
| |
| /* Following twitter access credentials should be set before using this operator. */ |
| @NotNull |
| private String consumerKey; |
| @NotNull |
| private String consumerSecret; |
| @NotNull |
| private String accessToken; |
| @NotNull |
| private String accessTokenSecret; |
| /* If twitter connection breaks then do we need to reconnect or exit */ |
| private boolean reConnect; |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| operatorThread = Thread.currentThread(); |
| |
| if (feedMultiplier != 1) { |
| logger.info("Load set to be {}% of the entire twitter feed", feedMultiplier); |
| } |
| |
| ConfigurationBuilder cb = setupConfigurationBuilder(); |
| ts = new TwitterStreamFactory(cb.build()).getInstance(); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| ts = null; |
| } |
| |
| @Override |
| public void onStatus(Status status) |
| { |
| int randomMultiplier = feedMultiplier; |
| |
| if (feedMultiplierVariance > 0) { |
| int min = feedMultiplier - feedMultiplierVariance; |
| if (min < 0) { |
| min = 0; |
| } |
| |
| int max = feedMultiplier + feedMultiplierVariance; |
| randomMultiplier = min + (int)(Math.random() * ((max - min) + 1)); |
| } |
| try { |
| for (int i = randomMultiplier; i-- > 0;) { |
| statuses.put(status); |
| count++; |
| } |
| } |
| catch (InterruptedException ex) { |
| logger.debug("Streaming interrupted; Passing the inerruption to the operator", ex); |
| operatorThread.interrupt(); |
| } |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| if (count % 16 == 0) { |
| logger.debug("processed {} statuses", count); |
| } |
| } |
| |
| @Override |
| public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) |
| { |
| // do nothing |
| } |
| |
| @Override |
| public void onTrackLimitationNotice(int numberOfLimitedStatuses) |
| { |
| // do nothing |
| } |
| |
| @Override |
| public void onScrubGeo(long userId, long upToStatusId) |
| { |
| // do nothing |
| } |
| |
| @Override public void onStallWarning(StallWarning stallWarning) |
| { |
| // do nothing |
| } |
| |
| @Override |
| public void onException(Exception ex) |
| { |
| logger.error("Sampling Error", ex); |
| logger.debug("reconnect: {}", reConnect); |
| ts.shutdown(); |
| if (reConnect) { |
| try { |
| Thread.sleep(1000); |
| } |
| catch (Exception e) { |
| } |
| setUpTwitterConnection(); |
| } |
| else { |
| operatorThread.interrupt(); |
| } |
| } |
| |
| /** |
| * Allow derived classes to customize the configuration |
| */ |
| protected ConfigurationBuilder setupConfigurationBuilder() |
| { |
| ConfigurationBuilder cb = new ConfigurationBuilder(); |
| cb.setDebugEnabled(debug). |
| setOAuthConsumerKey(consumerKey). |
| setOAuthConsumerSecret(consumerSecret). |
| setOAuthAccessToken(accessToken). |
| setOAuthAccessTokenSecret(accessTokenSecret); |
| return cb; |
| } |
| |
| private void setUpTwitterConnection() |
| { |
| ConfigurationBuilder cb = setupConfigurationBuilder(); |
| ts = new TwitterStreamFactory(cb.build()).getInstance(); |
| ts.addListener(TwitterSampleInput.this); |
| // we can only listen to tweets containing links by callng ts.links(). |
| // it seems it requires prior signed agreement with twitter. |
| ts.sample(); |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| } |
| |
| @Override |
| public void activate(OperatorContext context) |
| { |
| ts.addListener(this); |
| // we can only listen to tweets containing links by callng ts.links(). |
| // it seems it requires prior signed agreement with twitter. |
| ts.sample(); |
| } |
| |
| @Override |
| public void deactivate() |
| { |
| ts.shutdown(); |
| } |
| |
| public void setFeedMultiplier(int multiplier) |
| { |
| this.feedMultiplier = multiplier; |
| } |
| |
| public int getFeedMultiplier() |
| { |
| return this.feedMultiplier; |
| } |
| |
| public void setFeedMultiplierVariance(int multiplierVariance) |
| { |
| this.feedMultiplierVariance = multiplierVariance; |
| } |
| |
| public int getFeedMultiplierVariance() |
| { |
| return this.feedMultiplierVariance; |
| } |
| |
| @Override |
| public void emitTuples() |
| { |
| for (int size = statuses.size(); size-- > 0;) { |
| Status s = statuses.poll(); |
| if (status.isConnected()) { |
| status.emit(s); |
| } |
| |
| if (text.isConnected()) { |
| text.emit(s.getText()); |
| } |
| |
| if (url.isConnected()) { |
| URLEntity[] entities = s.getURLEntities(); |
| if (entities != null) { |
| for (URLEntity ue : entities) { |
| url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString()); |
| } |
| } |
| } |
| |
| if (hashtag.isConnected()) { |
| HashtagEntity[] hashtagEntities = s.getHashtagEntities(); |
| if (hashtagEntities != null) { |
| for (HashtagEntity he : hashtagEntities) { |
| hashtag.emit(he.getText()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param debug the debug to set |
| */ |
| public void setDebug(boolean debug) |
| { |
| this.debug = debug; |
| } |
| |
| /** |
| * @return the consumerKey |
| */ |
| public String getConsumerKey() |
| { |
| return consumerKey; |
| } |
| |
| /** |
| * @param consumerKey the consumerKey to set |
| */ |
| public void setConsumerKey(String consumerKey) |
| { |
| this.consumerKey = consumerKey; |
| } |
| |
| /** |
| * @return the consumerSecret |
| */ |
| public String getConsumerSecret() |
| { |
| return consumerSecret; |
| } |
| |
| /** |
| * @param consumerSecret the consumerSecret to set |
| */ |
| public void setConsumerSecret(String consumerSecret) |
| { |
| this.consumerSecret = consumerSecret; |
| } |
| |
| /** |
| * @return the accessToken |
| */ |
| public String getAccessToken() |
| { |
| return accessToken; |
| } |
| |
| /** |
| * @param accessToken the accessToken to set |
| */ |
| public void setAccessToken(String accessToken) |
| { |
| this.accessToken = accessToken; |
| } |
| |
| /** |
| * @return the accessTokenSecret |
| */ |
| public String getAccessTokenSecret() |
| { |
| return accessTokenSecret; |
| } |
| |
| /** |
| * @param accessTokenSecret the accessTokenSecret to set |
| */ |
| public void setAccessTokenSecret(String accessTokenSecret) |
| { |
| this.accessTokenSecret = accessTokenSecret; |
| } |
| |
| public boolean isReConnect() |
| { |
| return reConnect; |
| } |
| |
| public void setReConnect(boolean reConnect) |
| { |
| this.reConnect = reConnect; |
| } |
| |
| @Override |
| @SuppressWarnings({"ConstantConditions"}) |
| public int hashCode() |
| { |
| int hash = 7; |
| hash = 11 * hash + (this.debug ? 1 : 0); |
| hash = 11 * hash + this.feedMultiplier; |
| hash = 11 * hash + this.feedMultiplierVariance; |
| hash = 11 * hash + (this.consumerKey != null ? this.consumerKey.hashCode() : 0); |
| hash = 11 * hash + (this.consumerSecret != null ? this.consumerSecret.hashCode() : 0); |
| hash = 11 * hash + (this.accessToken != null ? this.accessToken.hashCode() : 0); |
| hash = 11 * hash + (this.accessTokenSecret != null ? this.accessTokenSecret.hashCode() : 0); |
| return hash; |
| } |
| |
| @Override |
| @SuppressWarnings({"ConstantConditions"}) |
| public boolean equals(Object obj) |
| { |
| if (obj == null) { |
| return false; |
| } |
| if (getClass() != obj.getClass()) { |
| return false; |
| } |
| final TwitterSampleInput other = (TwitterSampleInput) obj; |
| if (this.debug != other.debug) { |
| return false; |
| } |
| if (this.feedMultiplier != other.feedMultiplier) { |
| return false; |
| } |
| if (this.feedMultiplierVariance != other.feedMultiplierVariance) { |
| return false; |
| } |
| if ((this.consumerKey == null) ? (other.consumerKey != null) : !this.consumerKey.equals(other.consumerKey)) { |
| return false; |
| } |
| if ((this.consumerSecret == null) ? (other.consumerSecret != null) : !this.consumerSecret.equals(other.consumerSecret)) { |
| return false; |
| } |
| if ((this.accessToken == null) ? (other.accessToken != null) : !this.accessToken.equals(other.accessToken)) { |
| return false; |
| } |
| if ((this.accessTokenSecret == null) ? (other.accessTokenSecret != null) : !this.accessTokenSecret.equals(other.accessTokenSecret)) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "TwitterSampleInput{debug=" + debug + ", feedMultiplier=" + feedMultiplier + ", feedMultiplierVariance=" + feedMultiplierVariance + ", consumerKey=" + consumerKey + ", consumerSecret=" + consumerSecret + ", accessToken=" + accessToken + ", accessTokenSecret=" + accessTokenSecret + '}'; |
| } |
| |
| private static final Logger logger = LoggerFactory.getLogger(TwitterSampleInput.class); |
| } |