blob: e95dbd89c788a2cd921113c5b65c9bef0ab46472 [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.dataset.adapter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Tweet;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.external.data.parser.AbstractStreamDataParser;
import edu.uci.ics.asterix.external.data.parser.IDataParser;
import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
import edu.uci.ics.asterix.feed.intake.IFeedClient;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public class PullBasedTwitterAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter,
IManagedFeedAdapter, IMutableFeedAdapter {
private IDataStreamParser parser;
private int parallelism = 1;
private boolean stopRequested = false;
private boolean alterRequested = false;
private Map<String, String> alteredParams = new HashMap<String, String>();
public static final String QUERY = "query";
public static final String INTERVAL = "interval";
@Override
public void configure(Map<String, String> arguments, IAType atype) throws Exception {
configuration = arguments;
this.atype = atype;
partitionConstraint = new AlgebricksCountPartitionConstraint(1);
}
@Override
public AdapterDataFlowType getAdapterDataFlowType() {
return dataFlowType.PULL;
}
@Override
public AdapterType getAdapterType() {
return adapterType.READ;
}
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void beforeSuspend() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void beforeResume() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void beforeStop() throws Exception {
// TODO Auto-generated method stub
}
@Override
public IDataParser getDataParser(int partition) throws Exception {
if (parser == null) {
parser = new ManagedDelimitedDataStreamParser();
((IManagedDataParser) parser).setAdapter(this);
configuration.put(AbstractStreamDataParser.KEY_DELIMITER, "|");
parser.configure(configuration);
parser.initialize((ARecordType) atype, ctx);
TweetClient tweetClient = new TweetClient(ctx.getJobletContext().getApplicationContext().getNodeId(), this);
TweetStream tweetStream = new TweetStream(tweetClient, ctx);
parser.setInputStream(tweetStream);
}
return parser;
}
@Override
public void stop() throws Exception {
stopRequested = true;
}
public boolean isStopRequested() {
return stopRequested;
}
@Override
public void alter(Map<String, String> properties) throws Exception {
alterRequested = true;
this.alteredParams = properties;
}
public boolean isAlterRequested() {
return alterRequested;
}
public Map<String, String> getAlteredParams() {
return alteredParams;
}
public void postAlteration() {
alteredParams = null;
alterRequested = false;
}
}
class TweetStream extends InputStream {
private ByteBuffer buffer;
private int capacity;
private TweetClient tweetClient;
private List<String> tweets = new ArrayList<String>();
public TweetStream(TweetClient tweetClient, IHyracksTaskContext ctx) throws Exception {
capacity = ctx.getFrameSize();
buffer = ByteBuffer.allocate(capacity);
this.tweetClient = tweetClient;
initialize();
}
private void initialize() throws Exception {
boolean hasMore = tweetClient.next(tweets);
if (!hasMore) {
buffer.limit(0);
} else {
buffer.position(0);
buffer.limit(capacity);
for (String tweet : tweets) {
buffer.put(tweet.getBytes());
buffer.put("\n".getBytes());
}
buffer.flip();
}
}
@Override
public int read() throws IOException {
if (!buffer.hasRemaining()) {
boolean hasMore = tweetClient.next(tweets);
if (!hasMore) {
return -1;
}
buffer.position(0);
buffer.limit(capacity);
for (String tweet : tweets) {
buffer.put(tweet.getBytes());
buffer.put("\n".getBytes());
}
buffer.flip();
return buffer.get();
} else {
return buffer.get();
}
}
}
class TweetClient implements IFeedClient {
private String query;
private int timeInterval = 5;
private Character delimiter = '|';
private long id = 0;
private String id_prefix;
private final PullBasedTwitterAdapter adapter;
public TweetClient(String id_prefix, PullBasedTwitterAdapter adapter) {
this.id_prefix = id_prefix;
this.adapter = adapter;
initialize(adapter.getConfiguration());
}
private void initialize(Map<String, String> params) {
this.query = params.get(PullBasedTwitterAdapter.QUERY);
if (params.get(PullBasedTwitterAdapter.INTERVAL) != null) {
this.timeInterval = Integer.parseInt(params.get(PullBasedTwitterAdapter.INTERVAL));
}
}
@Override
public boolean next(List<String> tweets) {
try {
if (adapter.isStopRequested()) {
return false;
}
if (adapter.isAlterRequested()) {
initialize(((PullBasedTwitterAdapter) adapter).getAlteredParams());
adapter.postAlteration();
}
Thread.currentThread().sleep(1000 * timeInterval);
tweets.clear();
Twitter twitter = new TwitterFactory().getInstance();
QueryResult result = twitter.search(new Query(query));
List<Tweet> sourceTweets = result.getTweets();
for (Tweet tweet : sourceTweets) {
String tweetContent = formFeedTuple(tweet);
tweets.add(tweetContent);
System.out.println(tweetContent);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
public String formFeedTuple(Object tweetObject) {
Tweet tweet = (Tweet) tweetObject;
StringBuilder builder = new StringBuilder();
builder.append(id_prefix + ":" + id);
builder.append(delimiter);
builder.append(tweet.getFromUserId());
builder.append(delimiter);
builder.append("Orange County");
builder.append(delimiter);
builder.append(escapeChars(tweet));
builder.append(delimiter);
builder.append(tweet.getCreatedAt().toString());
id++;
return new String(builder);
}
private String escapeChars(Tweet tweet) {
if (tweet.getText().contains("\n")) {
return tweet.getText().replace("\n", " ");
}
return tweet.getText();
}
}