Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-streams-examples into STREAMS-295
Conflicts:
local/pom.xml
diff --git a/README.md b/README.md
index 6816339..dca889c 100644
--- a/README.md
+++ b/README.md
@@ -40,7 +40,7 @@
unless you choose to checkout a SNAPSHOT branch.
If so needed, incubator-streams can be checked out from:
- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples.git
+ http://git-wip-us.apache.org/repos/asf/incubator-streams.git
After check out, cd into incubator-streams and invoke maven to install it using:
$mvn install
diff --git a/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png b/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png
new file mode 100644
index 0000000..a8fc4d7
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/MongoElasticsearchSync.png
Binary files differ
diff --git a/local/mongo-elasticsearch-sync/README.md b/local/mongo-elasticsearch-sync/README.md
new file mode 100644
index 0000000..72b11db
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/README.md
@@ -0,0 +1,68 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+mongo-elasticsearch-sync
+==============================
+
+Requirements:
+-------------
+ - A running MongoDB 2.4+ instance
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+Copies documents from mongodb to elasticsearch
+
+Specification:
+-----------------
+
+[MongoElasticsearchSync.dot](src/main/resources/MongoElasticsearchSync.dot "MongoElasticsearchSync.dot" )
+
+Diagram:
+-----------------
+
+![MongoElasticsearchSync.png](./MongoElasticsearchSync.png?raw=true)
+
+Example Configuration:
+----------------------
+
+ {
+ "source": {
+ "host": "localhost",
+ "port": 27017,
+ "db": "streams",
+ "collection": "activities"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "destination",
+ "type": "activity"
+ }
+ }
+
+Build:
+---------
+
+`mvn clean package verify`
+
+Run:
+--------
+
+`java -cp target/mongo-elasticsearch-sync-0.1-SNAPSHOT.jar -Dconfig.file=src/main/resources/application.json org.apache.streams.example.elasticsearch.MongoElasticsearchSync`
+
+Deploy:
+--------
+
+`mvn -Pdocker clean package docker:build`
+
+`docker tag mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT`
+
+`docker push <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT`
+
+`docker run <dockerregistry>:mongo-elasticsearch-sync:0.2-incubating-SNAPSHOT java -cp mongo-elasticsearch-sync-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json org.apache.streams.example.elasticsearch.MongoElasticsearchSync`
+
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
new file mode 100644
index 0000000..4d271e0
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -0,0 +1,354 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-examples-local</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mongo-elasticsearch-sync</artifactId>
+
+ <properties>
+ <elasticsearch.version>1.1.0</elasticsearch.version>
+ <lucene.version>4.7.2</lucene.version>
+ </properties>
+
+ <dependencies>
+ <!-- Test includes -->
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-codecs</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-elasticsearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-mongo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <!-- This binary runs with logback -->
+ <!-- Keep log4j out -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-banned-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <bannedDependencies>
+ <excludes>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.log4j:log4j</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
+ </excludes>
+ </bannedDependencies>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>data</directory>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.github.joelittlejohn.embedmongo</groupId>
+ <artifactId>embedmongo-maven-plugin</artifactId>
+ <version>0.1.12</version>
+ <executions>
+ <execution>
+ <id>start</id>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ <configuration>
+ <port>37017</port>
+ <version>2.4.0</version>
+ <databaseDirectory>target/mongotest</databaseDirectory>
+ <logging>console</logging>
+ <skip>false</skip>
+ </configuration>
+ </execution>
+ <execution>
+ <id>stop</id>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.build.finalName}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.streams.example.elasticsearch.MongoElasticsearchSync</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <version>0.4.1</version>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.4</version>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <build>
+ <plugins>
+ <plugin>
+ <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
+ <groupId>org.jolokia</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.11.0</version>
+ <configuration>
+ <images>
+
+ <image>
+ <alias>${project.artifactId}</alias>
+ <name>${project.artifactId}:${project.version}</name>
+ <build>
+ <from>dockerfile/java:oracle-java8</from>
+ <assembly>
+ <basedir>/</basedir>
+ <descriptorRef>artifact</descriptorRef>
+ </assembly>
+ <!-- Default command for the build image -->
+ </build>
+
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+ <activation>
+ <activeByDefault/>
+ </activation>
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
new file mode 100644
index 0000000..fccbf47
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.*;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.mongo.MongoPersistReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class MongoElasticsearchSync implements Runnable {
+
+ public final static String STREAMS_ID = "MongoElasticsearchSync";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
+
+ MongoElasticsearchSyncConfiguration config;
+
+ public MongoElasticsearchSync() {
+ this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
+
+ public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
+ this.config = config;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ MongoElasticsearchSync sync = new MongoElasticsearchSync();
+
+ new Thread(sync).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
+
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(MongoPersistReader.STREAMS_ID, mongoPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, MongoPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
diff --git a/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json b/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json
new file mode 100644
index 0000000..a592699
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/jsonschema/MongoElasticsearchSyncConfiguration.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "type": "object",
+ "javaType" : "org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "source": { "javaType": "org.apache.streams.mongo.MongoConfiguration", "type": "object", "required": true },
+ "destination": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot b/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot
new file mode 100644
index 0000000..e3babf5
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/resources/MongoElasticsearchSync.dot
@@ -0,0 +1,18 @@
+digraph g {
+
+ //providers
+ MongoPersistReader [label="MongoPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java"];
+
+ //persisters
+ ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+ //data
+ source [label="mongdb://{db}/{collection}",shape=box];
+ destination [label="es://{index}/{type}",shape=box];
+
+ //stream
+ source -> MongoPersistReader [label="ObjectNode"];
+ MongoPersistReader -> ElasticsearchPersistWriter [label="ObjectNode"];
+ ElasticsearchPersistWriter -> destination [label="ObjectNode"];
+
+}
\ No newline at end of file
diff --git a/local/mongo-elasticsearch-sync/src/main/resources/application.json b/local/mongo-elasticsearch-sync/src/main/resources/application.json
new file mode 100644
index 0000000..facb336
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/main/resources/application.json
@@ -0,0 +1,17 @@
+{
+ "source": {
+ "host": "localhost",
+ "port": 27017,
+ "db": "streams",
+ "collection": "activities"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "destination",
+ "type": "activity"
+ }
+}
diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java
new file mode 100644
index 0000000..df6fa00
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.elasticsearch.MongoElasticsearchSync;
+import org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.mongo.MongoConfiguration;
+import org.apache.streams.mongo.MongoPersistReader;
+import org.apache.streams.mongo.MongoPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class MongoElasticsearchSyncIT extends ElasticsearchIntegrationTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ MongoElasticsearchSyncConfiguration syncConfiguration;
+
+ int srcCount = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ syncConfiguration = MAPPER.readValue(
+ MongoElasticsearchSyncIT.class.getResourceAsStream("/testSync.json"), MongoElasticsearchSyncConfiguration.class);
+
+ syncConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+ MongoPersistWriter setupWriter = new MongoPersistWriter(syncConfiguration.getSource());
+
+ setupWriter.prepare(null);
+
+ InputStream testActivityFolderStream = MongoElasticsearchSyncIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = MongoElasticsearchSyncIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ activity.getAdditionalProperties().remove("$license");
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ setupWriter.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ srcCount++;
+ }
+
+ setupWriter.cleanUp();
+
+ }
+
+ @Test
+ public void testSync() throws Exception {
+
+ assert srcCount > 0;
+
+ MongoElasticsearchSync sync = new MongoElasticsearchSync(syncConfiguration);
+
+ Thread reindexThread = new Thread(sync);
+ reindexThread.start();
+ reindexThread.join();
+
+ flushAndRefresh();
+
+ assert(indexExists("destination"));
+
+ long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
+ assert srcCount == destCount;
+
+ }
+}
diff --git a/local/mongo-elasticsearch-sync/src/test/resources/testSync.json b/local/mongo-elasticsearch-sync/src/test/resources/testSync.json
new file mode 100644
index 0000000..b678c21
--- /dev/null
+++ b/local/mongo-elasticsearch-sync/src/test/resources/testSync.json
@@ -0,0 +1,18 @@
+{
+ "source": {
+ "host": "localhost",
+ "port": 37017,
+ "db": "local",
+ "collection": "activities"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "destination",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+}
diff --git a/local/pom.xml b/local/pom.xml
index 910c708..b064a90 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,8 +37,9 @@
</properties>
<modules>
- <module>elasticsearch-hdfs</module>
- <module>elasticsearch-reindex</module>
+ <module>elasticsearch-hdfs</module>
+ <module>elasticsearch-reindex</module>
+ <module>mongo-elasticsearch-sync</module>
</modules>
</project>