Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-streams-examples into STREAMS-295

Conflicts:
	local/pom.xml
diff --git a/README.md b/README.md
index 6816339..dca889c 100644
--- a/README.md
+++ b/README.md
@@ -40,7 +40,7 @@
     unless you choose to checkout a SNAPSHOT branch.
     
     If so needed, incubator-streams can be checked out from:
-      http://git-wip-us.apache.org/repos/asf/incubator-streams-examples.git
+      http://git-wip-us.apache.org/repos/asf/incubator-streams.git
 
     After check out, cd into incubator-streams and invoke maven to install it using:
       $mvn install
diff --git a/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png b/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png
new file mode 100644
index 0000000..a8fc4d7
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png
Binary files differ
diff --git a/local/mongo-elasticsearch-sync/README.md b/local/mongo-elasticsearch-sync/README.md
new file mode 100644
index 0000000..72b11db
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/README.md
@@ -0,0 +1,68 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+mongo-elasticsearch-sync
+==============================
+
+Requirements:
+-------------
+ - A running MongoDB 2.4+ instance
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+Copies documents from mongodb to elasticsearch
+
+Specification:
+-----------------
+
+[MongoElasticsearchSync.dot](src/main/resources/MongoElasticsearchSync.dot "MongoElasticsearchSync.dot" )
+
+Diagram:
+-----------------
+
+![MongoElasticsearchSync.png](./MongoElasticsearchSync.png?raw=true)
+
+Example Configuration:
+----------------------
+
+    {
+        "source": {
+            "host": "localhost",
+            "port": 27017,
+            "db": "streams",
+            "collection": "activities"
+        },
+        "destination": {
+            "hosts": [
+                "localhost"
+            ],
+            "port": 9300,
+            "clusterName": "elasticsearch",
+            "index": "destination",
+            "type": "activity"
+        }
+    }
+
+Build:
+---------
+
+`mvn clean package verify`
+
+Run:
+--------
+
+`java -cp target/mongo-elasticsearch-sync-0.1-SNAPSHOT.jar -Dconfig.file=src/main/resources/application.json org.apache.streams.example.elasticsearch.MongoElasticsearchSync`
+
+Deploy:
+--------
+
+`mvn -Pdocker clean package docker:build`
+
+`docker tag mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT`
+
+`docker push <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT`
+
+`docker run <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT java -cp mongo-elasticsearch-sync-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json org.apache.streams.example.elasticsearch.MongoElasticsearchSync`
+
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
new file mode 100644
index 0000000..4d271e0
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -0,0 +1,354 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-examples-local</artifactId>
+        <version>0.2-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>mongo-elasticsearch-sync</artifactId>
+
+    <properties>
+        <elasticsearch.version>1.1.0</elasticsearch.version>
+        <lucene.version>4.7.2</lucene.version>
+    </properties>
+
+    <dependencies>
+        <!-- Test includes -->
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-codecs</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-elasticsearch</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-mongo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <!-- This binary runs with logback -->
+            <!-- Keep log4j out -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.3.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-banned-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <bannedDependencies>
+                                    <excludes>
+                                        <exclude>org.slf4j:slf4j-log4j12</exclude>
+                                        <exclude>org.slf4j:slf4j-jcl</exclude>
+                                        <exclude>org.slf4j:slf4j-jdk14</exclude>
+                                        <exclude>org.log4j:log4j</exclude>
+                                        <exclude>commons-logging:commons-logging</exclude>
+                                    </excludes>
+                                </bannedDependencies>
+                            </rules>
+                            <fail>true</fail>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>com.github.joelittlejohn.embedmongo</groupId>
+                <artifactId>embedmongo-maven-plugin</artifactId>
+                <version>0.1.12</version>
+                <executions>
+                    <execution>
+                        <id>start</id>
+                        <goals>
+                            <goal>start</goal>
+                        </goals>
+                        <configuration>
+                            <port>37017</port>
+                            <version>2.4.0</version>
+                            <databaseDirectory>target/mongotest</databaseDirectory>
+                            <logging>console</logging>
+                            <skip>false</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>stop</id>
+                        <goals>
+                            <goal>stop</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.build.finalName}</finalName>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.streams.example.elasticsearch.MongoElasticsearchSync</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <version>0.4.1</version>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12.4</version>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>docker</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
+                        <groupId>org.jolokia</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>0.11.0</version>
+                        <configuration>
+                            <images>
+
+                                <image>
+                                    <alias>${project.artifactId}</alias>
+                                    <name>${project.artifactId}:${project.version}</name>
+                                    <build>
+                                        <from>dockerfile/java:oracle-java8</from>
+                                        <assembly>
+                                            <basedir>/</basedir>
+                                            <descriptorRef>artifact</descriptorRef>
+                                        </assembly>
+                                        <!-- Default command for the build image -->
+                                    </build>
+
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+            <activation>
+                <activeByDefault/>
+            </activation>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
new file mode 100644
index 0000000..fccbf47
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.*;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.mongo.MongoPersistReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class MongoElasticsearchSync implements Runnable {
+
+    public final static String STREAMS_ID = "MongoElasticsearchSync";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
+
+    MongoElasticsearchSyncConfiguration config;
+
+    public MongoElasticsearchSync() {
+        this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+    }
+
+    public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
+        this.config = config;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        MongoElasticsearchSync sync = new MongoElasticsearchSync();
+
+        new Thread(sync).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
+
+        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(MongoPersistReader.STREAMS_ID, mongoPersistReader);
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, MongoPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}
diff --git a/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json b/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json
new file mode 100644
index 0000000..a592699
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "type": "object",
+  "javaType" : "org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "source": { "javaType": "org.apache.streams.mongo.MongoConfiguration", "type": "object", "required": true },
+    "destination": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+  }
+}
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot b/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot
new file mode 100644
index 0000000..e3babf5
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot
@@ -0,0 +1,18 @@
+digraph g {
+
+  //providers
+  MongoPersistReader [label="MongoPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java"];
+
+  //persisters
+  ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+  //data
+  source [label="mongdb://{db}/{collection}",shape=box];
+  destination [label="es://{index}/{type}",shape=box];
+ 
+  //stream
+  source -> MongoPersistReader [label="ObjectNode"];
+  MongoPersistReader -> ElasticsearchPersistWriter [label="ObjectNode"];
+  ElasticsearchPersistWriter -> destination [label="ObjectNode"];
+
+}
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/resources/application.json b/local/mongo-elasticsearch-sync/src/main/resources/application.json
new file mode 100644
index 0000000..facb336
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/resources/application.json
@@ -0,0 +1,17 @@
+{
+    "source": {
+        "host": "localhost",
+        "port": 27017,
+        "db": "streams",
+        "collection": "activities"
+    },
+    "destination": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch",
+        "index": "destination",
+        "type": "activity"
+    }
+}
diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java
new file mode 100644
index 0000000..df6fa00
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.elasticsearch.MongoElasticsearchSync;
+import org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.mongo.MongoConfiguration;
+import org.apache.streams.mongo.MongoPersistReader;
+import org.apache.streams.mongo.MongoPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class MongoElasticsearchSyncIT extends ElasticsearchIntegrationTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    MongoElasticsearchSyncConfiguration syncConfiguration;
+
+    int srcCount = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        syncConfiguration = MAPPER.readValue(
+                MongoElasticsearchSyncIT.class.getResourceAsStream("/testSync.json"), MongoElasticsearchSyncConfiguration.class);
+
+        syncConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+        MongoPersistWriter setupWriter = new MongoPersistWriter(syncConfiguration.getSource());
+
+        setupWriter.prepare(null);
+
+        InputStream testActivityFolderStream = MongoElasticsearchSyncIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = MongoElasticsearchSyncIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            activity.getAdditionalProperties().remove("$license");
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            setupWriter.write( datum );
+            LOGGER.info("Wrote: " + activity.getVerb() );
+            srcCount++;
+        }
+
+        setupWriter.cleanUp();
+
+    }
+
+    @Test
+    public void testSync() throws Exception {
+
+        assert srcCount > 0;
+
+        MongoElasticsearchSync sync = new MongoElasticsearchSync(syncConfiguration);
+
+        Thread reindexThread = new Thread(sync);
+        reindexThread.start();
+        reindexThread.join();
+
+        flushAndRefresh();
+
+        assert(indexExists("destination"));
+
+        long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
+        assert srcCount == destCount;
+
+    }
+}
diff --git a/local/mongo-elasticsearch-sync/src/test/resources/testSync.json b/local/mongo-elasticsearch-sync/src/test/resources/testSync.json
new file mode 100644
index 0000000..b678c21
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/test/resources/testSync.json
@@ -0,0 +1,18 @@
+{
+    "source": {
+        "host": "localhost",
+        "port": 37017,
+        "db": "local",
+        "collection": "activities"
+    },
+    "destination": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch",
+        "index": "destination",
+        "type": "activity",
+        "forceUseConfig": true
+    }
+}
diff --git a/local/pom.xml b/local/pom.xml
index 910c708..b064a90 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,8 +37,9 @@
     </properties>
 
     <modules>
-        <module>elasticsearch-hdfs</module>
-        <module>elasticsearch-reindex</module>
+	<module>elasticsearch-hdfs</module>
+	<module>elasticsearch-reindex</module>
+	<module>mongo-elasticsearch-sync</module>
     </modules>
 
 </project>