Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-streams-examples into STREAMS-295
Conflicts:
local/pom.xml
diff --git a/local/elasticsearch-hdfs/ElasticsearchHdfs.md b/local/elasticsearch-hdfs/ElasticsearchHdfs.md
new file mode 100644
index 0000000..b42d1a8
--- /dev/null
+++ b/local/elasticsearch-hdfs/ElasticsearchHdfs.md
@@ -0,0 +1,47 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+elasticsearch-hdfs
+==============================
+
+Description:
+-----------------
+
+Copies documents from elasticsearch to hdfs.
+
+Specification:
+-----------------
+
+[ElasticsearchHdfs.dot](src/main/resources/ElasticsearchHdfs.dot "ElasticsearchHdfs.dot" )
+
+Diagram:
+-----------------
+
+![ElasticsearchHdfs.png](./ElasticsearchHdfs.png?raw=true)
+
+Example Configuration:
+----------------------
+
+ {
+ "source": {
+ "host": "localhost",
+ "port": 27017,
+ "db": "streams",
+ "collection": "activities"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "destination",
+ "type": "activity"
+ }
+ }
+
+Run:
+--------
+
+`docker run elasticsearch-hdfs:0.2-incubating-SNAPSHOT-PP.jar java -cp stash-migrate-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json org.apache.streams.elasticsearch.example.HdfsElasticsearch`
diff --git a/local/elasticsearch-hdfs/HdfsElasticsearch.md b/local/elasticsearch-hdfs/HdfsElasticsearch.md
new file mode 100644
index 0000000..c4ca261
--- /dev/null
+++ b/local/elasticsearch-hdfs/HdfsElasticsearch.md
@@ -0,0 +1,50 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+hdfs-elasticsearch
+==============================
+
+Description:
+-----------------
+
+Copies documents from hdfs to elasticsearch.
+
+Specification:
+-----------------
+
+[HdfsElasticsearch.dot](src/main/resources/HdfsElasticsearch.dot "HdfsElasticsearch.dot" )
+
+Diagram:
+-----------------
+
+![HdfsElasticsearch.png](./HdfsElasticsearch.png?raw=true)
+
+Example Configuration:
+----------------------
+
+ {
+ "source": {
+ "scheme": "file",
+ "host": "localhost",
+ "user": "cloudera",
+ "path": "/tmp",
+ "writerPath": "activity"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "activity2",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+ }
+
+
+Run:
+--------
+
+`docker run elasticsearch-hdfs:0.2-incubating-SNAPSHOT.jar java -cp elasticsearch-hdfs-0.2-incubating-SNAPSHOT.jar -Dconfig.file=file://<location_of_config_file>.json org.apache.streams.elasticsearch.example.HdfsElasticsearch`
diff --git a/local/elasticsearch-hdfs/README.md b/local/elasticsearch-hdfs/README.md
new file mode 100644
index 0000000..2dd778d
--- /dev/null
+++ b/local/elasticsearch-hdfs/README.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+elasticsearch-hdfs
+==============================
+
+Requirements:
+-------------
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+Copies documents between elasticsearch and file system.
+
+Streams:
+--------
+
+[ElasticsearchHdfs](ElasticsearchHdfs.md "ElasticsearchHdfs" )
+
+[HdfsElasticsearch](HdfsElasticsearch.md "HdfsElasticsearch" )
+
+Build:
+---------
+
+`mvn clean package`
+
+Deploy:
+--------
+
+`mvn -Pdocker clean package docker:build`
+
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
new file mode 100644
index 0000000..0a81b75
--- /dev/null
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -0,0 +1,344 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-examples-local</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>elasticsearch-hdfs</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+
+ <properties>
+ <elasticsearch.version>1.1.0</elasticsearch.version>
+ <lucene.version>4.7.2</lucene.version>
+ </properties>
+
+ <dependencies>
+ <!-- Test includes -->
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-codecs</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-elasticsearch</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-hdfs</artifactId>
+ <version>0.2-incubating-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>data</directory>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <!-- This binary runs with logback -->
+ <!-- Keep log4j out -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-banned-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <bannedDependencies>
+ <excludes>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.log4j:log4j</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
+ </excludes>
+ </bannedDependencies>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.build.finalName}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/logback.xml</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.streams.example.elasticsearch.ElasticsearchReindex</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <version>0.4.1</version>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.4</version>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <build>
+ <plugins>
+ <plugin>
+ <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
+ <groupId>org.jolokia</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.11.0</version>
+ <configuration>
+ <images>
+
+ <image>
+ <alias>${project.artifactId}</alias>
+ <name>${project.artifactId}:${project.version}</name>
+ <build>
+ <from>dockerfile/java:oracle-java8</from>
+ <assembly>
+ <basedir>/</basedir>
+ <descriptorRef>artifact</descriptorRef>
+ </assembly>
+ <!-- Default command for the build image -->
+ </build>
+
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
new file mode 100644
index 0000000..da0acbd
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchHdfs implements Runnable {
+
+ public final static String STREAMS_ID = "ElasticsearchHdfs";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+
+ ElasticsearchHdfsConfiguration config;
+
+ public ElasticsearchHdfs() {
+ this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+ this.config = reindex;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ ElasticsearchHdfs backup = new ElasticsearchHdfs();
+
+ new Thread(backup).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+ WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
new file mode 100644
index 0000000..0a65479
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Copies documents into a new index
+ */
+public class HdfsElasticsearch implements Runnable {
+
+ public final static String STREAMS_ID = "HdfsElasticsearch";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+
+ HdfsElasticsearchConfiguration config;
+
+ public HdfsElasticsearch() {
+ this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+ this.config = reindex;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ HdfsElasticsearch restore = new HdfsElasticsearch();
+
+ new Thread(restore).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
new file mode 100644
index 0000000..91324dd
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "type": "object",
+ "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },
+ "destination": { "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
new file mode 100644
index 0000000..c8072ba
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "type": "object",
+ "javaType" : "org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "source": { "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration", "type": "object", "required": true },
+ "destination": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot b/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot
new file mode 100644
index 0000000..c953522
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/resources/ElasticsearchHdfs.dot
@@ -0,0 +1,17 @@
+digraph g {
+
+ //providers
+ ElasticsearchPersistReader [label="ElasticsearchPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java"];
+
+ //persisters
+ WebHdfsPersistWriter [label="WebHdfsPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+ //data
+ source [label="es://{indexes}/{types}",shape=box];
+ destination [label="hdfs://{index}/{type}",shape=box];
+
+ //stream
+ source -> ElasticsearchPersistReader
+ ElasticsearchPersistReader -> WebHdfsPersistWriter [label="String"];
+ WebHdfsPersistWriter -> destination
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot b/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot
new file mode 100644
index 0000000..f9a0efa
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/resources/HdfsElasticsearch.dot
@@ -0,0 +1,17 @@
+digraph g {
+
+ //providers
+ WebHdfsPersistReader [label="WebHdfsPersistReader",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java"];
+
+ //persisters
+ ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+ //data
+ source [label="hdfs://{indexes}/{types}",shape=box];
+ destination [label="es://{index}/{type}",shape=box];
+
+ //stream
+ source -> WebHdfsPersistReader
+ WebHdfsPersistReader -> ElasticsearchPersistWriter [label="String"];
+ ElasticsearchPersistWriter -> destination
+}
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
new file mode 100644
index 0000000..ad6970c
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.hdfs.HdfsConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.hdfs.HdfsWriterConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class ElasticsearchHdfsIT extends ElasticsearchIntegrationTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ testConfiguration = new ElasticsearchConfiguration();
+ testConfiguration.setHosts(Lists.newArrayList("localhost"));
+ testConfiguration.setClusterName(cluster().getClusterName());
+
+ ElasticsearchWriterConfiguration setupWriterConfiguration = MAPPER.convertValue(testConfiguration, ElasticsearchWriterConfiguration.class);
+ setupWriterConfiguration.setIndex("source");
+ setupWriterConfiguration.setType("activity");
+ setupWriterConfiguration.setBatchSize(5l);
+
+ ElasticsearchPersistWriter setupWriter = new ElasticsearchPersistWriter(setupWriterConfiguration);
+ setupWriter.prepare(null);
+
+ InputStream testActivityFolderStream = ElasticsearchHdfsIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = ElasticsearchHdfsIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ setupWriter.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ }
+
+ setupWriter.cleanUp();
+
+ flushAndRefresh();
+
+ }
+
+ @Test
+ public void testElasticsearchHdfs() throws Exception {
+
+ ElasticsearchHdfsConfiguration backupConfiguration = MAPPER.readValue(
+ ElasticsearchHdfsIT.class.getResourceAsStream("/testBackup.json"), ElasticsearchHdfsConfiguration.class);
+
+ backupConfiguration.getSource().setClusterName(cluster().getClusterName());
+
+ // backupConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+ assert(indexExists("source"));
+ long srcCount = client().count(client().prepareCount("source").request()).get().getCount();
+ assert srcCount > 0;
+
+ ElasticsearchHdfs backup = new ElasticsearchHdfs(backupConfiguration);
+
+ Thread backupThread = new Thread(backup);
+ backupThread.start();
+ backupThread.join();
+
+ HdfsElasticsearchConfiguration restoreConfiguration = MAPPER.readValue(
+ ElasticsearchHdfsIT.class.getResourceAsStream("/testRestore.json"), HdfsElasticsearchConfiguration.class);
+
+ restoreConfiguration.getDestination().setClusterName(cluster().getClusterName());
+
+ assert(!indexExists("destination"));
+
+ HdfsElasticsearch restore = new HdfsElasticsearch(restoreConfiguration);
+
+ Thread restoreThread = new Thread(restore);
+ restoreThread.start();
+
+ Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
+
+ restoreThread.join();
+
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+ flushAndRefresh();
+
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+ assert(indexExists("destination"));
+
+ long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
+
+ assert srcCount == destCount;
+ }
+
+}
diff --git a/local/elasticsearch-hdfs/src/test/resources/log4j.properties b/local/elasticsearch-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71255bb
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/logback.xml b/local/elasticsearch-hdfs/src/test/resources/logback.xml
new file mode 100644
index 0000000..fc47894
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/logback.xml
@@ -0,0 +1,27 @@
+<configuration debug="true" scan="true" scanPeriod="5 seconds">
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <append>true</append>
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>target/logback.txt</file>
+ <append>true</append>
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="FILE" />
+ </root>
+
+</configuration>
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/testBackup.json b/local/elasticsearch-hdfs/src/test/resources/testBackup.json
new file mode 100644
index 0000000..2002cb8
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/testBackup.json
@@ -0,0 +1,24 @@
+{
+ "source": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "indexes": [
+ "source"
+ ],
+ "types": [
+ "activity"
+ ]
+ },
+ "destination": {
+ "scheme": "file",
+ "host": "localhost",
+ "user": "cloudera",
+ "path": "target",
+ "writerPath": "test",
+ "writerFilePrefix": "activities"
+ }
+
+}
diff --git a/local/elasticsearch-hdfs/src/test/resources/testRestore.json b/local/elasticsearch-hdfs/src/test/resources/testRestore.json
new file mode 100644
index 0000000..63b42c5
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/testRestore.json
@@ -0,0 +1,20 @@
+{
+ "source": {
+ "scheme": "file",
+ "host": "localhost",
+ "user": "cloudera",
+ "path": "target",
+ "readerPath": "test"
+ },
+ "destination": {
+ "hosts": [
+ "localhost"
+ ],
+ "port": 9300,
+ "clusterName": "elasticsearch",
+ "index": "destination",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+
+}
diff --git a/local/pom.xml b/local/pom.xml
index e58c23f..b064a90 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,6 +37,7 @@
</properties>
<modules>
+ <module>elasticsearch-hdfs</module>
<module>elasticsearch-reindex</module>
<module>mongo-elasticsearch-sync</module>
</modules>