upgrade twitter-userstream-elasticsearch to es 2.0 + docker
needs changes to incubator-streams from STREAMS-427
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 1b7b64f..8e14913 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -34,8 +34,8 @@
<properties>
<docker.repo>apachestreams</docker.repo>
- <elasticsearch.version>1.1.0</elasticsearch.version>
- <lucene.version>4.7.2</lucene.version>
+ <elasticsearch.version>2.3.5</elasticsearch.version>
+ <lucene.version>5.5.0</lucene.version>
</properties>
<dependencies>
@@ -208,22 +208,102 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.12.4</version>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
+ <configuration>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*ITs.java</include>
+ </includes>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
+ <version>${docker.plugin.version}</version>
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>dockerITs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>skipITs</name>
+ <value>false</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker.plugin.version}</version>
+ <configuration combine.self="override">
+ <watchInterval>500</watchInterval>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <autoPull>on</autoPull>
+ <images>
+ <image>
+ <name>elasticsearch:2.3.5</name>
+ <alias>elasticsearch</alias>
+ <run>
+ <namingStrategy>none</namingStrategy>
+ <ports>
+ <port>${es.http.host}:${es.http.port}:9200</port>
+ <port>${es.tcp.host}:${es.tcp.port}:9300</port>
+ </ports>
+ <portPropertyFile>elasticsearch.properties</portPropertyFile>
+ <wait>
+ <log>elasticsearch startup</log>
+ <http>
+ <url>http://${es.http.host}:${es.http.port}</url>
+ <method>GET</method>
+ <status>200</status>
+ </http>
+ <time>20000</time>
+ <kill>1000</kill>
+ <shutdown>500</shutdown>
+ <!--<tcp>-->
+ <!--<host>${es.transport.host}</host>-->
+ <!--<ports>-->
+ <!--<port>${es.transport.port}</port>-->
+ <!--</ports>-->
+ <!--</tcp>-->
+ </wait>
+ <log>
+ <enabled>true</enabled>
+ <date>default</date>
+ <color>cyan</color>
+ </log>
+ </run>
+ <watch>
+ <mode>none</mode>
+ </watch>
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </profile>
+ </profiles>
</project>
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
similarity index 100%
rename from local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
rename to local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
deleted file mode 100644
index ef5f023..0000000
--- a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-twitter {
- endpoint = "userstream"
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
- follow = [
-
- ]
-}
-elasticsearch {
- hosts = [
- localhost
- ]
- port = 9300
- clusterName = elasticsearch
- index = userstream_activity
- type = activity
- batchSize = 1
-}
diff --git a/local/twitter-userstream-elasticsearch/src/site/markdown/index.md b/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
index baeee26..a10846c 100644
--- a/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
+++ b/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
@@ -33,8 +33,28 @@
Build:
---------
- mvn clean package verify
+ mvn clean package
+Testing:
+---------
+
+Create a local file `application.conf` with valid twitter credentials
+
+ twitter {
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+
+Build with integration testing enabled, using your credentials
+
+ mvn -PdockerITs docker:start
+ mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+ mvn -PdockerITs docker:stop
+
Run (Local):
------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
new file mode 100644
index 0000000..2f524f0
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.twitter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.example.TwitterUserstreamElasticsearch;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+public class TwitterUserstreamElasticsearchIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+
+ protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertFalse(indicesExistsResponse.isExists());
+
+ }
+
+ @Test
+ public void testReindex() throws Exception {
+
+ TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+
+ stream.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+ .setTypes(testConfiguration.getElasticsearch().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+ }
+}
diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
new file mode 100644
index 0000000..ae3b463
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
@@ -0,0 +1,14 @@
+twitter {
+ endpoint = sample
+ track = [
+ "data"
+ ]
+}
+elasticsearch {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ clusterName = elasticsearch
+ index = twitter_userstream_elasticsearch_it
+ type = activity
+ forceUseConfig = true
+}
\ No newline at end of file