blob: f29688cb8a41515406761d89e88476d9152311e0 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.twitter;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import twitter4j.Status;
import com.datatorrent.contrib.hbase.HBaseOutputOperator;
import com.datatorrent.contrib.hbase.HBaseRowStatePersistence;
import com.datatorrent.contrib.hbase.HBaseStatePersistenceStrategy;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DAGContext;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
* An application which connects to Twitter Sample Input and stores all the
* tweets with their usernames in a hbase database. Please review the docs
* for TwitterTopCounterApplication to setup your twitter credentials.
*
* You need to create the HBase table to run this demo. Table name can be
* configured but columnfamily must be 'cf' to make this demo simple and complied
* with the mysql based demo.
* create 'tablename', 'cf'
*
* </pre>
*
* @since 1.0.2
*/
@ApplicationAnnotation(name="TwitterDumpHBaseDemo")
public class TwitterDumpHBaseApplication implements StreamingApplication
{
public static class Status2Hbase extends HBaseOutputOperator<Status>{
@Override
public HBaseStatePersistenceStrategy getPersistenceStrategy()
{
return new HBaseRowStatePersistence();
}
@Override
public void processTuple(Status t) throws IOException
{
Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array());
put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes());
put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes());
getTable().put(put);
}
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase());
dag.addStream("Statuses", twitterStream.status, hBaseWriter.inputPort).setLocality(Locality.CONTAINER_LOCAL);
}
}