basic implementation (untested)
diff --git a/local/pom.xml b/local/pom.xml
index b75e138..30e0206 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,6 +37,8 @@
</properties>
<modules>
+
+ <module>twitter-userstream-elasticsearch</module>
</modules>
</project>
diff --git a/local/twitter-userstream-elasticsearch/README.md b/local/twitter-userstream-elasticsearch/README.md
new file mode 100644
index 0000000..e2b4a2d
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/README.md
@@ -0,0 +1,72 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+twitter-userstream-elasticsearch
+==============================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+This example connects to an active twitter account and stores the userstream as activities in Elasticsearch
+
+Specification:
+-----------------
+
+[TwitterUserstreamElasticsearch.dot](src/main/resources/TwitterUserstreamElasticsearch.dot "TwitterUserstreamElasticsearch.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserstreamElasticsearch.png](./TwitterUserstreamElasticsearch.png?raw=true)
+
+Example Configuration:
+----------------------
+
+ twitter {
+ endpoint = "userstream"
+ oauth {
+ consumerKey = "bcg14JThZEGoZ3MZOoT2HnJS7"
+ consumerSecret = "S4dwxnZni58CIJaoupGnUrO4HRHmbBGOb28W6IqOJBx36LPw2z"
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+ elasticsearch {
+ hosts = [
+ localhost
+ ]
+ port = 9300
+ clusterName = elasticsearch
+ index = userstream_activity
+ type = activity
+ batchSize = 1
+ }
+
+The consumerKey and consumerSecret are set for our streams-example application
+The accessToken and accessTokenSecret can be obtained by navigating to:
+ https://api.twitter.com/oauth/authenticate?oauth_token=UIJ0AUxCJatpKDUyFt0OTSEP4asZgqxRwUCT0AMSwc&oauth_callback=http%3A%2F%2Foauth.streamstutorial.w2odata.com%3A8080%2Fsocialauthdemo%2FsocialAuthSuccessAction.do
+
+Build:
+---------
+
+`mvn clean package verify`
+
+Run:
+--------
+
+`java -cp target/twitter-userstream-elasticsearch-0.2-incubating-SNAPSHOT.jar -Dconfig.file=src/main/resources/application.json org.apache.streams.example.twitter.TwitterUserstreamElasticsearch`
+
+Deploy:
+--------
+`mvn -Pdocker clean package docker:build`
+
+`docker tag twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT`
+
+`docker push <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT`
+
+`docker run <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT.jar java -cp twitter-userstream-elasticsearch-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json org.apache.streams.example.twitter.TwitterUserstreamElasticsearch`
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
new file mode 100644
index 0000000..1a3074c
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-examples-local</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twitter-userstream-elasticsearch</artifactId>
+
+ <properties>
+ <elasticsearch.version>1.1.0</elasticsearch.version>
+ <lucene.version>4.7.2</lucene.version>
+ </properties>
+
+ <dependencies>
+ <!-- Test includes -->
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-codecs</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-twitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-elasticsearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>data</directory>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.build.finalName}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.streams.example.twitter.TwitterUserstreamElasticsearch</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <version>0.4.1</version>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.4</version>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
new file mode 100644
index 0000000..f24b1c4
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Delete;
+import org.apache.streams.pojo.json.Follow;
+import org.apache.streams.pojo.json.Page;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
+import org.apache.streams.twitter.provider.TwitterConfigurator;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ */
+public class TwitterUserstreamElasticsearch implements Runnable {
+
+ public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+
+ TwitterUserstreamElasticsearchConfiguration config;
+
+ public TwitterUserstreamElasticsearch() {
+ this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+ new Thread(userstream).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+
+ TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+ TwitterTypeConverter converter = new TwitterTypeConverter(ObjectNode.class, Activity.class);
+ ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+ DeleteOnlyProcessor deleteOnlyProcessor = new DeleteOnlyProcessor();
+ NoDeletesProcessor noDeletesProcessor = new NoDeletesProcessor();
+ ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(25);
+
+ builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
+ builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
+ builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
+ builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
+ builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.STREAMS_ID, deleter, 1, "DeleteOnlyProcessor");
+
+ builder.start();
+
+ }
+
+ private class DeleteOnlyProcessor implements StreamsProcessor
+ {
+ String delete = new Delete().getVerb();
+
+ @Override
+ public void prepare(Object configurationObject) {}
+
+ @Override
+ public void cleanUp() {}
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity = (Activity) entry.getDocument();
+ if( activity.getVerb().equals(delete))
+ return Lists.newArrayList(entry);
+ else
+ return Lists.newArrayList();
+ }
+ }
+
+ private class NoDeletesProcessor implements StreamsProcessor
+ {
+ String delete = new Delete().getVerb();
+
+ @Override
+ public void prepare(Object configurationObject) {}
+
+ @Override
+ public void cleanUp() {}
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity = (Activity) entry.getDocument();
+ if( activity.getVerb().equals(delete))
+ return Lists.newArrayList();
+ else
+ return Lists.newArrayList(entry);
+ }
+ }
+
+}
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
new file mode 100644
index 0000000..d2167a8
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
@@ -0,0 +1,14 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.openwebfoundation.org/legal/the-owf-1-0-agreements/owfa-1-0",
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", "required": true },
+ "elasticsearch": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
new file mode 100644
index 0000000..5302bf9
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
@@ -0,0 +1,26 @@
+digraph g {
+
+ //providers
+ TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+ //processors
+ TwitterTypeConverter [label="TwitterTypeConverter",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+ DeleteOnlyProcessor [label="DeleteOnlyProcessor ",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+ NoDeletesProcessor [label="NoDeletes",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+
+ //persisters
+ ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+ ElasticsearchPersistDeleter [label="ElasticsearchPersistDeleter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+ //data
+ es [label="es://{index}/{type}",shape=box];
+
+ //stream
+ TwitterStreamProvider -> TwitterTypeConverter [label="ObjectNode"];
+ TwitterTypeConverter -> DeleteOnlyProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ TwitterTypeConverter -> NoDeletesProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ DeleteOnlyProcessor -> ElasticsearchPersistDeleter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ NoDeletesProcessor -> ElasticsearchPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ ElasticsearchPersistWriter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ ElasticsearchPersistDeleter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+}
\ No newline at end of file
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
new file mode 100644
index 0000000..50d48b5
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+twitter {
+ endpoint = "userstream"
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ follow = [
+
+ ]
+}
+elasticsearch {
+ hosts = [
+ localhost
+ ]
+ port = 9300
+ clusterName = elasticsearch
+ index = userstream_activity
+ type = activity
+ batchSize = 1
+}
diff --git a/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png b/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
new file mode 100644
index 0000000..de2b99f
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
Binary files differ