Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-streams-examples into STREAMS-295

Conflicts:
	local/pom.xml
diff --git a/local/elasticsearch-hdfs/ElasticsearchHdfs.md b/local/elasticsearch-hdfs/ElasticsearchHdfs.md
new file mode 100644
index 0000000..b42d1a8
--- /dev/null
+++ b/local/elasticsearch-hdfs/ElasticsearchHdfs.md
@@ -0,0 +1,47 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+elasticsearch-hdfs
+==============================
+
+Description:
+-----------------
+
+Copies documents from elasticsearch to hdfs.
+
+Specification:
+-----------------
+
+[ElasticsearchHdfs.dot](src/main/resources/ElasticsearchHdfs.dot "ElasticsearchHdfs.dot" )
+
+Diagram:
+-----------------
+
+![ElasticsearchHdfs.png](./ElasticsearchHdfs.png?raw=true)
+
+Example Configuration:
+----------------------
+
+    {
+        "source": {
+            "host": "localhost",
+            "port": 27017,
+            "db": "streams",
+            "collection": "activities"
+        },
+        "destination": {
+            "hosts": [
+                "localhost"
+            ],
+            "port": 9300,
+            "clusterName": "elasticsearch",
+            "index": "destination",
+            "type": "activity"
+        }
+    }
+
+Run:
+--------
+
+`docker run elasticsearch-hdfs:0.2-incubating-SNAPSHOT-PP.jar java -cp stash-migrate-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json org.apache.streams.elasticsearch.example.HdfsElasticsearch`
diff --git a/local/elasticsearch-hdfs/HdfsElasticsearch.md b/local/elasticsearch-hdfs/HdfsElasticsearch.md
new file mode 100644
index 0000000..c4ca261
--- /dev/null
+++ b/local/elasticsearch-hdfs/HdfsElasticsearch.md
@@ -0,0 +1,50 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+hdfs-elasticsearch
+==============================
+
+Description:
+-----------------
+
+Copies documents from hdfs to elasticsearch.
+
+Specification:
+-----------------
+
+[HdfsElasticsearch.dot](src/main/resources/HdfsElasticsearch.dot "HdfsElasticsearch.dot" )
+
+Diagram:
+-----------------
+
+![HdfsElasticsearch.png](./HdfsElasticsearch.png?raw=true)
+
+Example Configuration:
+----------------------
+
+    {
+        "source": {
+            "scheme": "file",
+            "host": "localhost",
+            "user": "cloudera",
+            "path": "/tmp",
+            "writerPath": "activity"
+        },
+        "destination": {
+            "hosts": [
+                "localhost"
+            ],
+            "port": 9300,
+            "clusterName": "elasticsearch",
+            "index": "activity2",
+            "type": "activity",
+            "forceUseConfig": true
+        }
+    }
+
+
+Run:
+--------
+
+`docker run elasticsearch-hdfs:0.2-incubating-SNAPSHOT.jar java -cp elasticsearch-hdfs-0.2-incubating-SNAPSHOT.jar -Dconfig.file=file://<location_of_config_file>.json org.apache.streams.elasticsearch.example.HdfsElasticsearch`
diff --git a/local/elasticsearch-hdfs/README.md b/local/elasticsearch-hdfs/README.md
new file mode 100644
index 0000000..2dd778d
--- /dev/null
+++ b/local/elasticsearch-hdfs/README.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+elasticsearch-hdfs
+==============================
+
+Requirements:
+-------------
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+Copies documents between elasticsearch and file system.
+
+Streams:
+--------
+
+[ElasticsearchHdfs](ElasticsearchHdfs.md "ElasticsearchHdfs" )
+
+[HdfsElasticsearch](HdfsElasticsearch.md "HdfsElasticsearch" )
+
+Build:
+---------
+
+`mvn clean package`
+
+Deploy:
+--------
+
+`mvn -Pdocker clean package docker:build`
+
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
new file mode 100644
index 0000000..0a81b75
--- /dev/null
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -0,0 +1,344 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-examples-local</artifactId>
+        <version>0.2-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>elasticsearch-hdfs</artifactId>
+    <version>0.2-incubating-SNAPSHOT</version>
+
+    <properties>
+        <elasticsearch.version>1.1.0</elasticsearch.version>
+        <lucene.version>4.7.2</lucene.version>
+    </properties>
+
+    <dependencies>
+    <!-- Test includes -->
+    <dependency>
+        <groupId>org.apache.lucene</groupId>
+        <artifactId>lucene-test-framework</artifactId>
+        <version>${lucene.version}</version>
+        <scope>test</scope>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.lucene</groupId>
+        <artifactId>lucene-codecs</artifactId>
+        <version>${lucene.version}</version>
+        <scope>test</scope>
+    </dependency>
+    <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <version>${elasticsearch.version}</version>
+        <type>test-jar</type>
+    </dependency>
+    <dependency>
+        <groupId>org.hamcrest</groupId>
+        <artifactId>hamcrest-all</artifactId>
+        <version>1.3</version>
+        <scope>test</scope>
+    </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-elasticsearch</artifactId>
+            <version>0.2-incubating-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-hdfs</artifactId>
+            <version>0.2-incubating-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <!-- This binary runs with logback -->
+            <!-- Keep log4j out -->
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <version>1.3.1</version>
+            <executions>
+                <execution>
+                    <id>enforce-banned-dependencies</id>
+                    <goals>
+                        <goal>enforce</goal>
+                    </goals>
+                    <configuration>
+                        <rules>
+                            <bannedDependencies>
+                                <excludes>
+                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
+                                    <exclude>org.slf4j:slf4j-jcl</exclude>
+                                    <exclude>org.slf4j:slf4j-jdk14</exclude>
+                                    <exclude>org.log4j:log4j</exclude>
+                                    <exclude>commons-logging:commons-logging</exclude>
+                                </excludes>
+                            </bannedDependencies>
+                        </rules>
+                        <fail>true</fail>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <finalName>${project.build.finalName}</finalName>
+                        <filters>
+                            <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                    <exclude>META-INF/*.SF</exclude>
+                                    <exclude>META-INF/*.DSA</exclude>
+                                    <exclude>META-INF/*.RSA</exclude>
+                                    <exclude>**/logback.xml</exclude>
+                                    <exclude>**/log4j.properties</exclude>
+                                </excludes>
+                            </filter>
+                        </filters>
+                        <transformers>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                <mainClass>org.apache.streams.example.elasticsearch.ElasticsearchReindex</mainClass>
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+            <version>0.4.1</version>
+            <configuration>
+                <addCompileSourceRoot>true</addCompileSourceRoot>
+                <generateBuilders>true</generateBuilders>
+                <sourcePaths>
+                    <sourcePath>src/main/jsonschema</sourcePath>
+                </sourcePaths>
+                <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+                <useJodaDates>false</useJodaDates>
+            </configuration>
+            <executions>
+                <execution>
+                    <goals>
+                        <goal>generate</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12.4</version>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>docker</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
+                        <groupId>org.jolokia</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>0.11.0</version>
+                        <configuration>
+                            <images>
+
+                                <image>
+                                    <alias>${project.artifactId}</alias>
+                                    <name>${project.artifactId}:${project.version}</name>
+                                    <build>
+                                        <from>dockerfile/java:oracle-java8</from>
+                                        <assembly>
+                                            <basedir>/</basedir>
+                                            <descriptorRef>artifact</descriptorRef>
+                                        </assembly>
+                                        <!-- Default command for the build image -->
+                                    </build>
+
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
new file mode 100644
index 0000000..da0acbd
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchHdfs implements Runnable {
+
+    public final static String STREAMS_ID = "ElasticsearchHdfs";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+
+    ElasticsearchHdfsConfiguration config;
+
+    public ElasticsearchHdfs() {
+       this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+        this.config = reindex;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        ElasticsearchHdfs backup = new ElasticsearchHdfs();
+
+        new Thread(backup).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+        WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+        builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
new file mode 100644
index 0000000..0a65479
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class HdfsElasticsearch implements Runnable {
+
+    public final static String STREAMS_ID = "HdfsElasticsearch";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+
+    HdfsElasticsearchConfiguration config;
+
+    public HdfsElasticsearch() {
+       this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+        this.config = reindex;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        HdfsElasticsearch restore = new HdfsElasticsearch();
+
+        new Thread(restore).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+
+        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
new file mode 100644
index 0000000..91324dd
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "type": "object",
+  "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },
+    "destination": { "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration", "type": "object", "required": true }
+  }
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
new file mode 100644
index 0000000..c8072ba
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "type": "object",
+  "javaType" : "org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "source": { "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration", "type": "object", "required": true },
+    "destination": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+  }
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot b/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot
new file mode 100644
index 0000000..c953522
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot
@@ -0,0 +1,17 @@
+digraph g {
+
+  //providers
+  ElasticsearchPersistReader [label="ElasticsearchPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java"];
+
+  //persisters
+  WebHdfsPersistWriter [label="WebHdfsPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+  //data
+  source [label="es://{indexes}/{types}",shape=box];
+  destination [label="hdfs://{index}/{type}",shape=box];
+
+  //stream
+  source -> ElasticsearchPersistReader
+  ElasticsearchPersistReader -> WebHdfsPersistWriter [label="String"];
+  WebHdfsPersistWriter -> destination
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot b/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot
new file mode 100644
index 0000000..f9a0efa
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot
@@ -0,0 +1,17 @@
+digraph g {
+
+  //providers
+  WebHdfsPersistReader [label="WebHdfsPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java"];
+
+  //persisters
+  ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+  //data
+  source [label="hdfs://{indexes}/{types}",shape=box];
+  destination [label="es://{index}/{type}",shape=box];
+
+  //stream
+  source -> WebHdfsPersistReader
+  WebHdfsPersistReader -> ElasticsearchPersistWriter [label="String"];
+  ElasticsearchPersistWriter -> destination
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
new file mode 100644
index 0000000..ad6970c
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.hdfs.HdfsConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.hdfs.HdfsWriterConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class ElasticsearchHdfsIT extends ElasticsearchIntegrationTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        testConfiguration = new ElasticsearchConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+        ElasticsearchWriterConfiguration setupWriterConfiguration = MAPPER.convertValue(testConfiguration, ElasticsearchWriterConfiguration.class);
+        setupWriterConfiguration.setIndex("source");
+        setupWriterConfiguration.setType("activity");
+        setupWriterConfiguration.setBatchSize(5l);
+
+        ElasticsearchPersistWriter setupWriter = new ElasticsearchPersistWriter(setupWriterConfiguration);
+        setupWriter.prepare(null);
+
+        InputStream testActivityFolderStream = ElasticsearchHdfsIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = ElasticsearchHdfsIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            setupWriter.write( datum );
+            LOGGER.info("Wrote: " + activity.getVerb() );
+        }
+
+        setupWriter.cleanUp();
+
+        flushAndRefresh();
+
+    }
+
+    @Test
+    public void testElasticsearchHdfs() throws Exception {
+
+        ElasticsearchHdfsConfiguration backupConfiguration = MAPPER.readValue(
+                ElasticsearchHdfsIT.class.getResourceAsStream("/testBackup.json"), ElasticsearchHdfsConfiguration.class);
+
+        backupConfiguration.getSource().setClusterName(cluster().getClusterName());
+
+        // backupConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+        assert(indexExists("source"));
+        long srcCount = client().count(client().prepareCount("source").request()).get().getCount();
+        assert srcCount > 0;
+
+        ElasticsearchHdfs backup = new ElasticsearchHdfs(backupConfiguration);
+
+        Thread backupThread = new Thread(backup);
+        backupThread.start();
+        backupThread.join();
+
+        HdfsElasticsearchConfiguration restoreConfiguration = MAPPER.readValue(
+                ElasticsearchHdfsIT.class.getResourceAsStream("/testRestore.json"), HdfsElasticsearchConfiguration.class);
+
+        restoreConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+        assert(!indexExists("destination"));
+
+        HdfsElasticsearch restore = new HdfsElasticsearch(restoreConfiguration);
+
+        Thread restoreThread = new Thread(restore);
+        restoreThread.start();
+
+        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
+
+        restoreThread.join();
+
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+        flushAndRefresh();
+
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+        assert(indexExists("destination"));
+
+        long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
+
+        assert srcCount == destCount;
+    }
+
+}
diff --git a/local/elasticsearch-hdfs/src/test/resources/log4j.properties b/local/elasticsearch-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71255bb
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/logback.xml b/local/elasticsearch-hdfs/src/test/resources/logback.xml
new file mode 100644
index 0000000..fc47894
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/logback.xml
@@ -0,0 +1,27 @@
+<configuration debug="true" scan="true" scanPeriod="5 seconds">
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>target/logback.txt</file>
+        <append>true</append>
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="DEBUG">
+        <appender-ref ref="CONSOLE" />
+        <appender-ref ref="FILE" />
+    </root>
+
+</configuration>
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/testBackup.json b/local/elasticsearch-hdfs/src/test/resources/testBackup.json
new file mode 100644
index 0000000..2002cb8
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/testBackup.json
@@ -0,0 +1,24 @@
+{
+    "source": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch",
+        "indexes": [
+            "source"
+        ],
+        "types": [
+            "activity"
+        ]
+    },
+    "destination": {
+      "scheme": "file",
+      "host": "localhost",
+      "user": "cloudera",
+      "path": "target",
+      "writerPath": "test",
+      "writerFilePrefix": "activities"
+    }
+
+}
diff --git a/local/elasticsearch-hdfs/src/test/resources/testRestore.json b/local/elasticsearch-hdfs/src/test/resources/testRestore.json
new file mode 100644
index 0000000..63b42c5
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/testRestore.json
@@ -0,0 +1,20 @@
+{
+      "source": {
+        "scheme": "file",
+        "host": "localhost",
+        "user": "cloudera",
+        "path": "target",
+        "readerPath": "test"
+      },
+      "destination": {
+          "hosts": [
+              "localhost"
+          ],
+          "port": 9300,
+          "clusterName": "elasticsearch",
+          "index": "destination",
+          "type": "activity",
+          "forceUseConfig": true
+      }
+
+}
diff --git a/local/pom.xml b/local/pom.xml
index e58c23f..b064a90 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,6 +37,7 @@
     </properties>
 
     <modules>
+	<module>elasticsearch-hdfs</module>
 	<module>elasticsearch-reindex</module>
 	<module>mongo-elasticsearch-sync</module>
     </modules>