STREAMS-474: Apply check style requirements to examples
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 49ca5b9..b6d806c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -32,6 +32,9 @@
import org.apache.streams.jackson.StreamsJacksonMapper
import org.slf4j.LoggerFactory
+/**
+ * FlinkBase is a base class with capabilities common to all of the streams flink examples.
+ */
trait FlinkBase {
private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
@@ -101,16 +104,16 @@
}
}
-// def setup(typesafe: Config): Boolean = {
-//
-// val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
-//
-// this.streamsConfig = streamsConfig
-//
-// BASELOGGER.info("Streams Config: " + streamsConfig)
-//
-// setup(streamsConfig)
-// }
+ // def setup(typesafe: Config): Boolean = {
+ //
+ // val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+ //
+ // this.streamsConfig = streamsConfig
+ //
+ // BASELOGGER.info("Streams Config: " + streamsConfig)
+ //
+ // setup(streamsConfig)
+ // }
def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index a5a4f72..17246e5 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -24,6 +24,7 @@
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
@@ -31,26 +32,29 @@
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.TwitterFollowingConfiguration
import org.apache.streams.twitter.pojo.Follow
import org.apache.streams.twitter.provider.TwitterFollowingProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.streams.examples.flink.FlinkBase
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
+ * set of IDs, writing each connection as a twitter:follow in json format to dfs.
+ */
object FlinkTwitterFollowingPipeline extends FlinkBase {
- val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+ val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
- private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
- private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
- override def main(args: Array[String]) = {
+ override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
if( !setup(jobConfig) ) System.exit(1)
@@ -58,111 +62,111 @@
val thread = new Thread(pipeline)
thread.start()
thread.join()
+ }
+
+ def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
}
- def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
-
- LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
-
- if( jobConfig == null ) {
- LOGGER.error("jobConfig is null!")
- System.err.println("jobConfig is null!")
- return false
- }
-
- if( jobConfig.getSource == null ) {
- LOGGER.error("jobConfig.getSource is null!")
- System.err.println("jobConfig.getSource is null!")
- return false
- }
-
- if( jobConfig.getDestination == null ) {
- LOGGER.error("jobConfig.getDestination is null!")
- System.err.println("jobConfig.getDestination is null!")
- return false
- }
-
- if( jobConfig.getTwitter == null ) {
- LOGGER.error("jobConfig.getTwitter is null!")
- System.err.println("jobConfig.getTwitter is null!")
- return false
- }
-
- Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
-
- true
-
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
}
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+ true
+
+ }
+
}
class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
- import FlinkTwitterFollowingPipeline._
+ import FlinkTwitterFollowingPipeline._
- override def run(): Unit = {
+ override def run(): Unit = {
- val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
- env.setNumberOfExecutionRetries(0)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+ env.setNumberOfExecutionRetries(0)
- val inPath = buildReaderPath(config.getSource)
+ val inPath = buildReaderPath(config.getSource)
- val outPath = buildWriterPath(config.getDestination)
+ val outPath = buildWriterPath(config.getDestination)
- val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
- // these datums contain 'Follow' objects
- val followDatums: DataStream[StreamsDatum] =
- keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+ // these datums contain 'Follow' objects
+ val followDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
- val follows: DataStream[Follow] = followDatums
- .map(datum => datum.getDocument.asInstanceOf[Follow])
+ val follows: DataStream[Follow] = followDatums
+ .map(datum => datum.getDocument.asInstanceOf[Follow])
- val jsons: DataStream[String] = follows
- .map(follow => {
- val MAPPER = StreamsJacksonMapper.getInstance
- MAPPER.writeValueAsString(follow)
- })
+ val jsons: DataStream[String] = follows
+ .map(follow => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(follow)
+ })
- if( config.getTest == false )
- jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
- else
- jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism)
- // if( test == true ) jsons.print();
+ // if( test == true ) jsons.print();
- env.execute(STREAMS_ID)
+ env.execute(STREAMS_ID)
+ }
+
+ class FollowingCollectorFlatMapFunction(
+ twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+ flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectConnections(input, out)
}
- class FollowingCollectorFlatMapFunction(
- twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
- flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
-
- override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
- collectConnections(input, out)
- }
-
- def collectConnections(id : String, out : Collector[StreamsDatum]) = {
- val twitProvider: TwitterFollowingProvider =
- new TwitterFollowingProvider(
- twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
- )
- twitProvider.prepare(twitProvider)
- twitProvider.startStream()
- var iterator: Iterator[StreamsDatum] = null
- do {
- Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
- twitProvider.readCurrent().iterator().toList.map(out.collect(_))
- } while( twitProvider.isRunning )
- }
+ def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+ val twitProvider: TwitterFollowingProvider =
+ new TwitterFollowingProvider(
+ twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
}
+ }
}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 8bb2997..549e048 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -23,35 +23,29 @@
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.fs.RollingSink
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.Tweet
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterPostsPipeline collects recent posts from all profiles from a
+ * set of IDs, writing each post as a twitter:status in json format to dfs.
+ */
object FlinkTwitterPostsPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -154,13 +148,13 @@
class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
- collectPosts(input, out)
+ collectPosts(input, out)
}
def collectPosts(id : String, out : Collector[StreamsDatum]) = {
val twitterConfiguration = config.getTwitter
val twitProvider: TwitterTimelineProvider =
new TwitterTimelineProvider(
- twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
+ twitterConfiguration.withInfo(List(toProviderId(id)))
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 56d892b..dbb8a33 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,11 +25,12 @@
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
@@ -38,13 +39,16 @@
import org.apache.streams.flink.FlinkStreamingConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import org.apache.streams.twitter.provider.TwitterStreamProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterSpritzerPipeline opens a spritzer stream and writes
+ * each post received as a twitter:status in json format to dfs.
+ */
object FlinkTwitterSpritzerPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 01425f6..c180089 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -22,15 +22,14 @@
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -45,6 +44,10 @@
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterPostsPipeline collects the current user profile of a
+ * set of IDs, writing each as a twitter:user in json format to dfs.
+ */
object FlinkTwitterUserInformationPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 6cf8d9d..be22b82 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -34,7 +34,7 @@
import scala.io.Source
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
*/
class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 8ea7f9c..9829ebc 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -21,24 +21,20 @@
import java.io.File
import java.nio.file.{Files, Paths}
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.junit.Ignore
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
*/
class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 7113c4c..987e82d 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -20,27 +20,21 @@
import java.io.File
import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
-import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
import org.scalatest.FlatSpec
import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
+import scala.io.Source
+
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
*/
class FlinkTwitterPostsPipelineIT extends FlatSpec {
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 29625ca..7570bac 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -23,20 +23,18 @@
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
*/
class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index b4387f9..ab88d48 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -22,25 +22,19 @@
import java.nio.file.{Files, Paths}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
-import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.scalatest.FlatSpec
-import org.scalatest._
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
-import org.scalatest.Ignore
import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
+import scala.io.Source
+
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
*/
class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index be79f4a..b859d60 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -23,59 +23,54 @@
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from an elasticsearch index to new-line delimited json on dfs.
*/
public class ElasticsearchHdfs implements Runnable {
- public final static String STREAMS_ID = "ElasticsearchHdfs";
+ public final static String STREAMS_ID = "ElasticsearchHdfs";
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
- ElasticsearchHdfsConfiguration config;
+ ElasticsearchHdfsConfiguration config;
- public ElasticsearchHdfs() {
- this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public ElasticsearchHdfs() {
+ this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- }
+ public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+ this.config = reindex;
+ }
- public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
- this.config = reindex;
- }
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+ ElasticsearchHdfs backup = new ElasticsearchHdfs();
+ new Thread(backup).start();
+ }
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ @Override
+ public void run() {
- ElasticsearchHdfs backup = new ElasticsearchHdfs();
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+ WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
- new Thread(backup).start();
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- }
-
- @Override
- public void run() {
-
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
- WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
- builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
- builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index 375665c..caf9cbc 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -23,59 +23,54 @@
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from new-line delimited json on dfs to an elasticsearch index.
*/
public class HdfsElasticsearch implements Runnable {
- public final static String STREAMS_ID = "HdfsElasticsearch";
+ public final static String STREAMS_ID = "HdfsElasticsearch";
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
- HdfsElasticsearchConfiguration config;
+ HdfsElasticsearchConfiguration config;
- public HdfsElasticsearch() {
- this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public HdfsElasticsearch() {
+ this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- }
+ public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+ this.config = reindex;
+ }
- public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
- this.config = reindex;
- }
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+ HdfsElasticsearch restore = new HdfsElasticsearch();
+ new Thread(restore).start();
+ }
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ @Override
+ public void run() {
- HdfsElasticsearch restore = new HdfsElasticsearch();
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
- new Thread(restore).start();
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- }
-
- @Override
- public void run() {
-
- WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
-
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
- builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
index 437ebf6..9b70440 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -28,7 +28,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,56 +49,54 @@
import static org.testng.Assert.assertNotEquals;
/**
- * Test copying documents between hdfs and elasticsearch
+ * ElasticsearchHdfsIT is an integration test for ElasticsearchHdfs.
*/
public class ElasticsearchHdfsIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchHdfsConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchHdfsConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertNotEquals(count, 0);
- }
+ assertNotEquals(count, 0);
+ }
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
- ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+ ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+ backup.run();
- backup.run();
-
- // assert lines in file
- }
+ }
}
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
index a629025..d5f6a29 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -28,7 +28,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -52,59 +51,59 @@
import static org.testng.AssertJUnit.assertTrue;
/**
- * Test copying documents between hdfs and elasticsearch
+ * HdfsElasticsearchIT is an integration test for HdfsElasticsearch.
*/
public class HdfsElasticsearchIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected HdfsElasticsearchConfiguration testConfiguration;
- protected Client testClient;
+ protected HdfsElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- if(indicesExistsResponse.isExists()) {
- DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
- DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
- assertTrue(deleteIndexResponse.isAcknowledged());
- };
- }
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ if(indicesExistsResponse.isExists()) {
+ DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+ DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+ assertTrue(deleteIndexResponse.isAcknowledged());
+ };
+ }
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
- HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+ HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
- restore.run();
+ restore.run();
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertEquals(countResponse.getHits().getTotalHits(), 89);
+ assertEquals(countResponse.getHits().getTotalHits(), 89);
- }
+ }
}
diff --git a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
index 80ef53a..42cb01c 100644
--- a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
@@ -27,3 +27,4 @@
path = "target/test-classes"
writerPath = "elasticsearch_hdfs_it"
}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
index 072a024..30be89b 100644
--- a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
@@ -28,4 +28,5 @@
type = "activity"
refresh = true
forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
index 676a272..476e369 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -23,59 +23,59 @@
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from the source index to the destination index.
*/
public class ElasticsearchReindex implements Runnable {
- public final static String STREAMS_ID = "ElasticsearchReindex";
+ public final static String STREAMS_ID = "ElasticsearchReindex";
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
- ElasticsearchReindexConfiguration config;
+ ElasticsearchReindexConfiguration config;
- public ElasticsearchReindex() {
- this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public ElasticsearchReindex() {
+ this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
- }
+ }
- public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
- this.config = reindex;
- }
+ public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+ this.config = reindex;
+ }
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
- ElasticsearchReindex reindex = new ElasticsearchReindex();
+ ElasticsearchReindex reindex = new ElasticsearchReindex();
- new Thread(reindex).start();
+ new Thread(reindex).start();
- }
+ }
- @Override
- public void run() {
+ @Override
+ public void run() {
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
index 3260949..04d7cd6 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -28,7 +28,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,64 +49,64 @@
import static org.junit.Assert.assertNotEquals;
/**
- * Test copying parent/child associated documents between two indexes on same cluster
+ * Test copying parent/child associated documents between two indexes on same cluster.
*/
public class ElasticsearchReindexChildIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertNotEquals(count, 0);
+ assertNotEquals(count, 0);
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
index a324e24..6c69388 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
@@ -53,59 +53,59 @@
*/
public class ElasticsearchReindexIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertThat(count, not(0));
+ assertThat(count, not(0));
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
index 988852a..e53c057 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
@@ -30,7 +30,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -59,68 +58,68 @@
*/
public class ElasticsearchReindexParentIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
- URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
- ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
- String templateSource = MAPPER.writeValueAsString(template);
- putTemplateRequestBuilder.setSource(templateSource);
+ PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
+ URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+ ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+ String templateSource = MAPPER.writeValueAsString(template);
+ putTemplateRequestBuilder.setSource(templateSource);
- testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+ testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
- assertThat(count, not(0));
+ assertThat(count, not(0));
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
index 0062f0f..faac23e 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
index 4527a6b..2d994b9 100644
--- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
@@ -22,61 +22,56 @@
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.mongo.MongoPersistReader;
-import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
- * Copies documents into a new index
+ * Copies a mongo collection to an elasticsearch index.
*/
public class MongoElasticsearchSync implements Runnable {
- public final static String STREAMS_ID = "MongoElasticsearchSync";
+ public final static String STREAMS_ID = "MongoElasticsearchSync";
- private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
- MongoElasticsearchSyncConfiguration config;
+ MongoElasticsearchSyncConfiguration config;
- public MongoElasticsearchSync() {
- this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
- }
+ public MongoElasticsearchSync() {
+ this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
- this.config = config;
- }
+ public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
+ this.config = config;
+ }
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
- MongoElasticsearchSync sync = new MongoElasticsearchSync();
+ MongoElasticsearchSync sync = new MongoElasticsearchSync();
- new Thread(sync).start();
+ new Thread(sync).start();
- }
+ }
- @Override
- public void run() {
+ @Override
+ public void run() {
- MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
+ MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
- LocalRuntimeConfiguration localRuntimeConfiguration = new LocalRuntimeConfiguration();
- localRuntimeConfiguration.setIdentifier(STREAMS_ID);
- localRuntimeConfiguration.setTaskTimeoutMs((long)(60 * 1000));
- localRuntimeConfiguration.setQueueSize((long)1000);
- StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
index 02af293..84d0fba 100644
--- a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
+++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
@@ -28,7 +28,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -51,55 +50,55 @@
import static org.testng.Assert.assertTrue;
/**
- * Test copying documents between two indexes on same cluster
+ * MongoElasticsearchSyncIT is an integration test for MongoElasticsearchSync.
*/
public class MongoElasticsearchSyncIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected MongoElasticsearchSyncConfiguration testConfiguration;
- protected Client testClient;
+ protected MongoElasticsearchSyncConfiguration testConfiguration;
+ protected Client testClient;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertFalse(indicesExistsResponse.isExists());
- }
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertFalse(indicesExistsResponse.isExists());
+ }
- @Test
- public void testSync() throws Exception {
+ @Test
+ public void testSync() throws Exception {
- MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
+ MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
- sync.run();
+ sync.run();
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertEquals(89, (int)countResponse.getHits().getTotalHits());
+ assertEquals(89, (int)countResponse.getHits().getTotalHits());
- }
+ }
}
diff --git a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
index 61e61d7..86a41b6 100644
--- a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
+++ b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
@@ -30,4 +30,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
index 34ac8c4..5ffb6ed 100644
--- a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
+++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -18,7 +18,6 @@
package org.apache.streams.example;
-import com.google.common.collect.Lists;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.ActivityConverterProcessor;
@@ -27,14 +26,17 @@
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.data.DocumentClassifier;
-import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
import org.apache.streams.graph.GraphHttpConfiguration;
import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
import org.apache.streams.twitter.provider.TwitterFollowingProvider;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,50 +46,53 @@
*/
public class TwitterFollowNeo4j implements Runnable {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
- TwitterFollowNeo4jConfiguration config;
+ TwitterFollowNeo4jConfiguration config;
- public TwitterFollowNeo4j() {
- this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
- }
+ public TwitterFollowNeo4j() {
+ this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
- this.config = config;
- }
+ public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
+ this.config = config;
+ }
- public void run() {
+ public void run() {
- TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
- TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
- TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
+ TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
+ TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
+ TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
- ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
- new ActivityConverterProcessorConfiguration()
- .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
- .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
- ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
+ ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
+ new ActivityConverterProcessorConfiguration()
+ .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
+ .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
+ ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
- GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
- GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
+ GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
+ GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
- StreamBuilder builder = new LocalStreamBuilder();
- builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider);
- builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID);
- builder.addStreamsProcessor("activity", activity, 1, "converter");
- builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity");
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.start();
- }
+ builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider);
+ builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName());
+ builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName());
+ builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
- public static void main(String[] args) {
+ builder.start();
+ }
- LOGGER.info(StreamsConfigurator.config.toString());
+ public static void main(String[] args) {
- TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
+ LOGGER.info(StreamsConfigurator.config.toString());
- stream.run();
+ TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
- }
+ stream.run();
+
+ }
}
\ No newline at end of file
diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
index 51593b0..ac9362e 100644
--- a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
+++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -18,15 +18,13 @@
package org.apache.streams.example.test;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.example.TwitterFollowNeo4j;
import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
@@ -35,35 +33,35 @@
import java.io.File;
/**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j.
*/
public class TwitterFollowNeo4jIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
- protected TwitterFollowNeo4jConfiguration testConfiguration;
+ protected TwitterFollowNeo4jConfiguration testConfiguration;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
- }
+ }
- @Test
- public void testTwitterFollowGraph() throws Exception {
+ @Test
+ public void testTwitterFollowGraph() throws Exception {
- TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
+ TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
- stream.run();
+ stream.run();
- }
+ }
}
diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
index d4b4aeb..346b111 100644
--- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
@@ -25,4 +25,5 @@
port = ${neo4j.http.port}
type = "neo4j"
graph = "data"
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
index 7d87f36..60f3405 100644
--- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
+++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
@@ -18,14 +18,17 @@
package org.apache.streams.example;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.ActivityConverterProcessor;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,48 +37,48 @@
*
* Converts them to activities, and writes them in activity format to Elasticsearch.
*/
-
public class TwitterHistoryElasticsearch implements Runnable {
- public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
+ public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectMapper mapper = new ObjectMapper();
- TwitterHistoryElasticsearchConfiguration config;
+ TwitterHistoryElasticsearchConfiguration config;
- public TwitterHistoryElasticsearch() {
- this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public TwitterHistoryElasticsearch() {
+ this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- }
+ public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
+ this.config = config;
+ }
- public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
- this.config = config;
- }
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
- TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
+ new Thread(history).start();
- new Thread(history).start();
-
- }
+ }
- public void run() {
+ public void run() {
- TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
- ActivityConverterProcessor converter = new ActivityConverterProcessor();
- ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
+ ActivityConverterProcessor converter = new ActivityConverterProcessor();
+ ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
- StreamBuilder builder = new LocalStreamBuilder(500);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream("provider", provider);
- builder.addStreamsProcessor("converter", converter, 2, "provider");
- builder.addStreamsPersistWriter("writer", writer, 1, "converter");
- builder.start();
- }
+ builder.newPerpetualStream(TwitterTimelineProvider.class.getCanonicalName(), provider);
+ builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterTimelineProvider.class.getCanonicalName());
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, ActivityConverterProcessor.class.getCanonicalName());
+ builder.start();
+ }
}
diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
index 07c1d88..0eb022b 100644
--- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
+++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
@@ -26,7 +26,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,62 +44,61 @@
import java.io.File;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.AssertJUnit.assertTrue;
/**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time.
*/
public class TwitterHistoryElasticsearchIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
- protected TwitterHistoryElasticsearchConfiguration testConfiguration;
- protected Client testClient;
+ protected TwitterHistoryElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- if(indicesExistsResponse.isExists()) {
- DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
- DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
- assertTrue(deleteIndexResponse.isAcknowledged());
- };
- }
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ if(indicesExistsResponse.isExists()) {
+ DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+ DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+ assertTrue(deleteIndexResponse.isAcknowledged());
+ };
+ }
- @Test
- public void testTwitterHistoryElasticsearch() throws Exception {
+ @Test
+ public void testTwitterHistoryElasticsearch() throws Exception {
- TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
+ TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
- stream.run();
+ stream.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getElasticsearch().getIndex())
- .setTypes(testConfiguration.getElasticsearch().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+ .setTypes(testConfiguration.getElasticsearch().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertNotEquals(count, 0);
- }
+ assertNotEquals(count, 0);
+ }
}
diff --git a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
index 1a05e32..81e4903 100644
--- a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
+++ b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
@@ -26,4 +26,4 @@
index = twitter_history_elasticsearch_it
type = activity
forceUseConfig = true
-}
\ No newline at end of file
+}
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
index f1e776a..369ec0b 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
@@ -18,129 +18,129 @@
package org.apache.streams.example;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
import org.apache.streams.filters.VerbDefinitionDropFilter;
import org.apache.streams.filters.VerbDefinitionKeepFilter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.core.StreamBuilder;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.provider.TwitterStreamProvider;
import org.apache.streams.verbs.ObjectCombination;
import org.apache.streams.verbs.VerbDefinition;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.elasticsearch.common.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.Map;
/**
* Example stream that populates elasticsearch with activities from twitter userstream in real-time
*/
public class TwitterUserstreamElasticsearch implements Runnable {
- public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+ public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
- /* this pattern will match any/only deletes */
- private static VerbDefinition deleteVerbDefinition =
- new VerbDefinition()
- .withValue("delete")
- .withObjects(Lists.newArrayList(new ObjectCombination()));
+ /* this pattern will match any/only deletes */
+ private static VerbDefinition deleteVerbDefinition =
+ new VerbDefinition()
+ .withValue("delete")
+ .withObjects(Lists.newArrayList(new ObjectCombination()));
- TwitterUserstreamElasticsearchConfiguration config;
+ TwitterUserstreamElasticsearchConfiguration config;
- public TwitterUserstreamElasticsearch() {
- this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public TwitterUserstreamElasticsearch() {
+ this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
+
+ public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+ new Thread(userstream).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+
+ TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+ ActivityConverterProcessor converter = new ActivityConverterProcessor();
+ VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
+ ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+ VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+ SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
+ ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
+
+ builder.newPerpetualStream(TwitterStreamProvider.class.getCanonicalName(), stream);
+ builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterStreamProvider.class.getCanonicalName());
+ builder.addStreamsProcessor(VerbDefinitionDropFilter.class.getCanonicalName(), noDeletesProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, VerbDefinitionDropFilter.class.getCanonicalName());
+ builder.addStreamsProcessor(VerbDefinitionKeepFilter.class.getCanonicalName(), deleteOnlyProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+ builder.addStreamsProcessor(SetDeleteIdProcessor.class.getCanonicalName(), setDeleteIdProcessor, 1, VerbDefinitionKeepFilter.class.getCanonicalName());
+ builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.class.getCanonicalName(), deleter, 1, SetDeleteIdProcessor.class.getCanonicalName());
+
+ builder.start();
+
+ }
+
+ protected class SetDeleteIdProcessor implements StreamsProcessor {
+
+ public String getId() {
+ return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
}
- public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
- this.config = config;
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ String id = entry.getId();
+ // replace delete with post in id
+ // ensure ElasticsearchPersistDeleter will remove original post if present
+ id = Strings.replace(id, "delete", "post");
+ entry.setId(id);
+
+ return Lists.newArrayList(entry);
}
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ @Override
+ public void prepare(Object configurationObject) {
- TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
- new Thread(userstream).start();
}
@Override
- public void run() {
-
- TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
- ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
-
- TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
- ActivityConverterProcessor converter = new ActivityConverterProcessor();
- VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
- ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
- VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
- SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
- ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
-
- builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
- builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
- builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
- builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
- builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
- builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
-
- builder.start();
+ public void cleanUp() {
}
-
- protected class SetDeleteIdProcessor implements StreamsProcessor {
-
- public String getId() {
- return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- String id = entry.getId();
- // replace delete with post in id
- // ensure ElasticsearchPersistDeleter will remove original post if present
- id = Strings.replace(id, "delete", "post");
- entry.setId(id);
-
- return Lists.newArrayList(entry);
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
-
- }
-
- @Override
- public void cleanUp() {
-
- }
- }
+ }
}
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
index 2fd26db..63dd8de 100644
--- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
@@ -26,7 +26,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,7 +44,6 @@
import java.io.File;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.AssertJUnit.assertTrue;
@@ -54,55 +52,55 @@
*/
public class TwitterUserstreamElasticsearchIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
- protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
- protected Client testClient;
+ protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- if(indicesExistsResponse.isExists()) {
- DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
- DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
- assertTrue(deleteIndexResponse.isAcknowledged());
- };
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ if(indicesExistsResponse.isExists()) {
+ DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+ DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+ assertTrue(deleteIndexResponse.isAcknowledged());
+ };
- }
+ }
- @Test
- public void testUserstreamElasticsearch() throws Exception {
+ @Test
+ public void testUserstreamElasticsearch() throws Exception {
- TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+ TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
- Thread thread = new Thread(stream);
- thread.start();
- thread.join(30000);
+ Thread thread = new Thread(stream);
+ thread.start();
+ thread.join(30000);
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getElasticsearch().getIndex())
- .setTypes(testConfiguration.getElasticsearch().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+ .setTypes(testConfiguration.getElasticsearch().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertNotEquals(count, 0);
- }
+ assertNotEquals(count, 0);
+ }
}
diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
index df9be4d..bca2d51 100644
--- a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
+++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
@@ -26,4 +26,5 @@
index = twitter_userstream_elasticsearch_it
type = activity
forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e369e36..384d71a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,10 +263,88 @@
</dependencyManagement>
<build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ </plugin>
+ </plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${checkstyle.plugin.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>7.2</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <configuration>
+ <configLocation>http://streams.incubator.apache.org/site/${project.version}/streams-master/streams-java-checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
+ <failsOnError>false</failsOnError>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>${scalastyle.plugin.version}</version>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>false</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
+ <!--<configLocation>https://raw.githubusercontent.com/databricks/sbt-databricks/master/scalastyle-config.xml</configLocation>-->
+ <outputFile>${project.build.directory}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${shade.plugin.version}</version>
<configuration>