STREAMS-474: Apply check style requirements to examples
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 49ca5b9..b6d806c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -32,6 +32,9 @@
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.slf4j.LoggerFactory
 
+/**
+  * FlinkBase is a base class with capabilities common to all of the streams flink examples.
+  */
 trait FlinkBase {
 
   private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
@@ -101,16 +104,16 @@
     }
   }
 
-//  def setup(typesafe: Config): Boolean =  {
-//
-//    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
-//
-//    this.streamsConfig = streamsConfig
-//
-//    BASELOGGER.info("Streams Config: " + streamsConfig)
-//
-//    setup(streamsConfig)
-//  }
+  //  def setup(typesafe: Config): Boolean =  {
+  //
+  //    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+  //
+  //    this.streamsConfig = streamsConfig
+  //
+  //    BASELOGGER.info("Streams Config: " + streamsConfig)
+  //
+  //    setup(streamsConfig)
+  //  }
 
   def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
 
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index a5a4f72..17246e5 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -24,6 +24,7 @@
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
@@ -31,26 +32,29 @@
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.streams.examples.flink.FlinkBase
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
+  * set of IDs, writing each connection as a twitter:follow in json format to dfs.
+  */
 object FlinkTwitterFollowingPipeline extends FlinkBase {
 
-    val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+  val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
 
-    private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
-    private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
-    override def main(args: Array[String]) = {
+  override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
     if( !setup(jobConfig) ) System.exit(1)
@@ -58,111 +62,111 @@
     val thread = new Thread(pipeline)
     thread.start()
     thread.join()
+  }
+
+  def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
     }
 
-    def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
-
-        LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
-
-        if( jobConfig == null ) {
-            LOGGER.error("jobConfig is null!")
-            System.err.println("jobConfig is null!")
-            return false
-        }
-
-        if( jobConfig.getSource == null ) {
-            LOGGER.error("jobConfig.getSource is null!")
-            System.err.println("jobConfig.getSource is null!")
-            return false
-        }
-
-        if( jobConfig.getDestination == null ) {
-            LOGGER.error("jobConfig.getDestination is null!")
-            System.err.println("jobConfig.getDestination is null!")
-            return false
-        }
-
-        if( jobConfig.getTwitter == null ) {
-            LOGGER.error("jobConfig.getTwitter is null!")
-            System.err.println("jobConfig.getTwitter is null!")
-            return false
-        }
-
-        Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
-
-        true
-
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
     }
 
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+    true
+
+  }
+
 }
 
 class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
 
-    import FlinkTwitterFollowingPipeline._
+  import FlinkTwitterFollowingPipeline._
 
-    override def run(): Unit = {
+  override def run(): Unit = {
 
-        val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-        env.setNumberOfExecutionRetries(0)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+    env.setNumberOfExecutionRetries(0)
 
-        val inPath = buildReaderPath(config.getSource)
+    val inPath = buildReaderPath(config.getSource)
 
-        val outPath = buildWriterPath(config.getDestination)
+    val outPath = buildWriterPath(config.getDestination)
 
-        val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
 
-        // these datums contain 'Follow' objects
-        val followDatums: DataStream[StreamsDatum] =
-            keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+    // these datums contain 'Follow' objects
+    val followDatums: DataStream[StreamsDatum] =
+      keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
 
-        val follows: DataStream[Follow] = followDatums
-          .map(datum => datum.getDocument.asInstanceOf[Follow])
+    val follows: DataStream[Follow] = followDatums
+      .map(datum => datum.getDocument.asInstanceOf[Follow])
 
-        val jsons: DataStream[String] = follows
-          .map(follow => {
-              val MAPPER = StreamsJacksonMapper.getInstance
-              MAPPER.writeValueAsString(follow)
-          })
+    val jsons: DataStream[String] = follows
+      .map(follow => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(follow)
+      })
 
-        if( config.getTest == false )
-            jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
-        else
-            jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-              .setParallelism(env.getParallelism)
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism)
 
-        // if( test == true ) jsons.print();
+    // if( test == true ) jsons.print();
 
-        env.execute(STREAMS_ID)
+    env.execute(STREAMS_ID)
+  }
+
+  class FollowingCollectorFlatMapFunction(
+                                           twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+                                           flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+                                         ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+      collectConnections(input, out)
     }
 
-    class FollowingCollectorFlatMapFunction(
-                                             twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
-                                             flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-                                           ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
-
-        override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-            collectConnections(input, out)
-        }
-
-        def collectConnections(id : String, out : Collector[StreamsDatum]) = {
-            val twitProvider: TwitterFollowingProvider =
-                new TwitterFollowingProvider(
-                    twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
-                )
-            twitProvider.prepare(twitProvider)
-            twitProvider.startStream()
-            var iterator: Iterator[StreamsDatum] = null
-            do {
-                Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
-                twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-            } while( twitProvider.isRunning )
-        }
+    def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+      val twitProvider: TwitterFollowingProvider =
+        new TwitterFollowingProvider(
+          twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
     }
+  }
 
 }
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 8bb2997..549e048 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -23,35 +23,29 @@
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.fs.RollingSink
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.Tweet
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterPostsPipeline collects recent posts from all profiles from a
+  * set of IDs, writing each post as a twitter:status in json format to dfs.
+  */
 object FlinkTwitterPostsPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -154,13 +148,13 @@
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
     override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-        collectPosts(input, out)
+      collectPosts(input, out)
     }
     def collectPosts(id : String, out : Collector[StreamsDatum]) = {
       val twitterConfiguration = config.getTwitter
       val twitProvider: TwitterTimelineProvider =
         new TwitterTimelineProvider(
-          twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
+          twitterConfiguration.withInfo(List(toProviderId(id)))
         )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 56d892b..dbb8a33 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,11 +25,12 @@
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
@@ -38,13 +39,16 @@
 import org.apache.streams.flink.FlinkStreamingConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterSpritzerPipeline opens a spritzer stream and writes
+  * each post received as a twitter:status in json format to dfs.
+  */
 object FlinkTwitterSpritzerPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 01425f6..c180089 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -22,15 +22,14 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -45,6 +44,10 @@
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterPostsPipeline collects the current user profile of a
+  * set of IDs, writing each as a twitter:user in json format to dfs.
+  */
 object FlinkTwitterUserInformationPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 6cf8d9d..be22b82 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -34,7 +34,7 @@
 import scala.io.Source
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
   */
 class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
 
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 8ea7f9c..9829ebc 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -21,24 +21,20 @@
 import java.io.File
 import java.nio.file.{Files, Paths}
 
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.junit.Ignore
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
   */
 class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
 
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 7113c4c..987e82d 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -20,27 +20,21 @@
 
 import java.io.File
 import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
 
-import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
 import org.scalatest.FlatSpec
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
 import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
+import scala.io.Source
+
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
   */
 class FlinkTwitterPostsPipelineIT extends FlatSpec  {
 
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 29625ca..7570bac 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -23,20 +23,18 @@
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
   */
 class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index b4387f9..ab88d48 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -22,25 +22,19 @@
 import java.nio.file.{Files, Paths}
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
-import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.scalatest.FlatSpec
-import org.scalatest._
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
-import org.scalatest.Ignore
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
 import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
+import scala.io.Source
+
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
   */
 class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
 
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index be79f4a..b859d60 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -23,59 +23,54 @@
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from an elasticsearch index to new-line delimited json on dfs.
  */
 public class ElasticsearchHdfs implements Runnable {
 
-    public final static String STREAMS_ID = "ElasticsearchHdfs";
+  public final static String STREAMS_ID = "ElasticsearchHdfs";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
 
-    ElasticsearchHdfsConfiguration config;
+  ElasticsearchHdfsConfiguration config;
 
-    public ElasticsearchHdfs() {
-       this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public ElasticsearchHdfs() {
+    this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    }
+  public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
-        this.config = reindex;
-    }
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
+    ElasticsearchHdfs backup = new ElasticsearchHdfs();
+    new Thread(backup).start();
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  @Override
+  public void run() {
 
-        ElasticsearchHdfs backup = new ElasticsearchHdfs();
+    ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+    WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
 
-        new Thread(backup).start();
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-    }
-
-    @Override
-    public void run() {
-
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
-        WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
-        builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+    builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index 375665c..caf9cbc 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -23,59 +23,54 @@
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from new-line delimited json on dfs to an elasticsearch index.
  */
 public class HdfsElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "HdfsElasticsearch";
+  public final static String STREAMS_ID = "HdfsElasticsearch";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
 
-    HdfsElasticsearchConfiguration config;
+  HdfsElasticsearchConfiguration config;
 
-    public HdfsElasticsearch() {
-       this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public HdfsElasticsearch() {
+    this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    }
+  public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
-        this.config = reindex;
-    }
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
+    HdfsElasticsearch restore = new HdfsElasticsearch();
+    new Thread(restore).start();
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  @Override
+  public void run() {
 
-        HdfsElasticsearch restore = new HdfsElasticsearch();
+    WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        new Thread(restore).start();
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-    }
-
-    @Override
-    public void run() {
-
-        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
-
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
-        builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
index 437ebf6..9b70440 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -28,7 +28,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,56 +49,54 @@
 import static org.testng.Assert.assertNotEquals;
 
 /**
- * Test copying documents between hdfs and elasticsearch
+ * ElasticsearchHdfsIT is an integration test for ElasticsearchHdfs.
  */
 public class ElasticsearchHdfsIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchHdfsConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchHdfsConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
-    }
+    assertNotEquals(count, 0);
+  }
 
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
+  @Test
+  public void ElasticsearchHdfsIT() throws Exception {
 
-        ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+    ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+    backup.run();
 
-        backup.run();
-
-        // assert lines in file
-    }
+  }
 
 }
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
index a629025..d5f6a29 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -28,7 +28,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -52,59 +51,59 @@
 import static org.testng.AssertJUnit.assertTrue;
 
 /**
- * Test copying documents between hdfs and elasticsearch
+ * HdfsElasticsearchIT is an integration test for HdfsElasticsearch.
  */
 public class HdfsElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected HdfsElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected HdfsElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
+  }
 
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
+  @Test
+  public void ElasticsearchHdfsIT() throws Exception {
 
-        HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+    HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
 
-        restore.run();
+    restore.run();
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(countResponse.getHits().getTotalHits(), 89);
+    assertEquals(countResponse.getHits().getTotalHits(), 89);
 
-    }
+  }
 
 }
diff --git a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
index 80ef53a..42cb01c 100644
--- a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
@@ -27,3 +27,4 @@
   path = "target/test-classes"
   writerPath = "elasticsearch_hdfs_it"
 }
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
index 072a024..30be89b 100644
--- a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
@@ -28,4 +28,5 @@
   type = "activity"
   refresh = true
   forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
index 676a272..476e369 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -23,59 +23,59 @@
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from the source index to the destination index.
  */
 public class ElasticsearchReindex implements Runnable {
 
-    public final static String STREAMS_ID = "ElasticsearchReindex";
+  public final static String STREAMS_ID = "ElasticsearchReindex";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
 
-    ElasticsearchReindexConfiguration config;
+  ElasticsearchReindexConfiguration config;
 
-    public ElasticsearchReindex() {
-       this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public ElasticsearchReindex() {
+    this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
 
-    }
+  }
 
-    public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
-        this.config = reindex;
-    }
+  public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex();
+    ElasticsearchReindex reindex = new ElasticsearchReindex();
 
-        new Thread(reindex).start();
+    new Thread(reindex).start();
 
-    }
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+    ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
 
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
index 3260949..04d7cd6 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -28,7 +28,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,64 +49,64 @@
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Test copying parent/child associated documents between two indexes on same cluster
+ * Test copying parent/child associated documents between two indexes on same cluster.
  */
 public class ElasticsearchReindexChildIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
+    assertNotEquals(count, 0);
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 
 }
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
index a324e24..6c69388 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
@@ -53,59 +53,59 @@
  */
 public class ElasticsearchReindexIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        SearchRequestBuilder countRequest = testClient
-            .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-            .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertThat(count, not(0));
+    assertThat(count, not(0));
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-            .prepareSearch(testConfiguration.getDestination().getIndex())
-            .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 }
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
index 988852a..e53c057 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
@@ -30,7 +30,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -59,68 +58,68 @@
  */
 public class ElasticsearchReindexParentIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
-        URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
-        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
-        String templateSource = MAPPER.writeValueAsString(template);
-        putTemplateRequestBuilder.setSource(templateSource);
+    PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
+    URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+    ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+    String templateSource = MAPPER.writeValueAsString(template);
+    putTemplateRequestBuilder.setSource(templateSource);
 
-        testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+    testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
 
-        assertThat(count, not(0));
+    assertThat(count, not(0));
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 
 }
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
index 0062f0f..faac23e 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
index 4527a6b..2d994b9 100644
--- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
@@ -22,61 +22,56 @@
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.mongo.MongoPersistReader;
 
-import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
- * Copies documents into a new index
+ * Copies a mongo collection to an elasticsearch index.
  */
 public class MongoElasticsearchSync implements Runnable {
 
-    public final static String STREAMS_ID = "MongoElasticsearchSync";
+  public final static String STREAMS_ID = "MongoElasticsearchSync";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
 
-    MongoElasticsearchSyncConfiguration config;
+  MongoElasticsearchSyncConfiguration config;
 
-    public MongoElasticsearchSync() {
-        this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-    }
+  public MongoElasticsearchSync() {
+    this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
-        this.config = config;
-    }
+  public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
+    this.config = config;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        MongoElasticsearchSync sync = new MongoElasticsearchSync();
+    MongoElasticsearchSync sync = new MongoElasticsearchSync();
 
-        new Thread(sync).start();
+    new Thread(sync).start();
 
-    }
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
+    MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
 
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        LocalRuntimeConfiguration localRuntimeConfiguration = new LocalRuntimeConfiguration();
-        localRuntimeConfiguration.setIdentifier(STREAMS_ID);
-        localRuntimeConfiguration.setTaskTimeoutMs((long)(60 * 1000));
-        localRuntimeConfiguration.setQueueSize((long)1000);
-        StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }
diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
index 02af293..84d0fba 100644
--- a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
+++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
@@ -28,7 +28,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -51,55 +50,55 @@
 import static org.testng.Assert.assertTrue;
 
 /**
- * Test copying documents between two indexes on same cluster
+ * MongoElasticsearchSyncIT is an integration test for MongoElasticsearchSync.
  */
 public class MongoElasticsearchSyncIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected MongoElasticsearchSyncConfiguration testConfiguration;
-    protected Client testClient;
+  protected MongoElasticsearchSyncConfiguration testConfiguration;
+  protected Client testClient;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertFalse(indicesExistsResponse.isExists());
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertFalse(indicesExistsResponse.isExists());
+  }
 
-    @Test
-    public void testSync() throws Exception {
+  @Test
+  public void testSync() throws Exception {
 
-        MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
+    MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
 
-        sync.run();
+    sync.run();
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(89, (int)countResponse.getHits().getTotalHits());
+    assertEquals(89, (int)countResponse.getHits().getTotalHits());
 
-    }
+  }
 }
diff --git a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
index 61e61d7..86a41b6 100644
--- a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
+++ b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
@@ -30,4 +30,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }
diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
index 34ac8c4..5ffb6ed 100644
--- a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
+++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.example;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
@@ -27,14 +26,17 @@
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.DocumentClassifier;
-import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
 import org.apache.streams.graph.GraphHttpConfiguration;
 import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
 import org.apache.streams.twitter.provider.TwitterFollowingProvider;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,50 +46,53 @@
  */
 public class TwitterFollowNeo4j implements Runnable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
 
-    TwitterFollowNeo4jConfiguration config;
+  TwitterFollowNeo4jConfiguration config;
 
-    public TwitterFollowNeo4j() {
-        this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-    }
+  public TwitterFollowNeo4j() {
+    this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
-        this.config = config;
-    }
+  public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
+    this.config = config;
+  }
 
-    public void run() {
+  public void run() {
 
-        TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
-        TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
-        TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
+    TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
+    TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
+    TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
 
-        ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
-                new ActivityConverterProcessorConfiguration()
-                        .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
-                        .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
-        ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
+    ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
+        new ActivityConverterProcessorConfiguration()
+            .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
+            .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
+    ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
 
-        GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
-        GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
+    GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
+    GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
 
-        StreamBuilder builder = new LocalStreamBuilder();
-        builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider);
-        builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID);
-        builder.addStreamsProcessor("activity", activity, 1, "converter");
-        builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity");
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.start();
-    }
+    builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider);
+    builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName());
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName());
+    builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
 
-    public static void main(String[] args) {
+    builder.start();
+  }
 
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args) {
 
-        TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        stream.run();
+    TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
 
-    }
+    stream.run();
+
+  }
 
 }
\ No newline at end of file
diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
index 51593b0..ac9362e 100644
--- a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
+++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -18,15 +18,13 @@
 
 package org.apache.streams.example.test;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.example.TwitterFollowNeo4j;
 import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeClass;
@@ -35,35 +33,35 @@
 import java.io.File;
 
 /**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j.
  */
 public class TwitterFollowNeo4jIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
 
-    protected TwitterFollowNeo4jConfiguration testConfiguration;
+  protected TwitterFollowNeo4jConfiguration testConfiguration;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
 
-    }
+  }
 
-    @Test
-    public void testTwitterFollowGraph() throws Exception {
+  @Test
+  public void testTwitterFollowGraph() throws Exception {
 
-        TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
+    TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
 
-        stream.run();
+    stream.run();
 
-    }
+  }
 
 }
diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
index d4b4aeb..346b111 100644
--- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
@@ -25,4 +25,5 @@
   port = ${neo4j.http.port}
   type = "neo4j"
   graph = "data"
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
index 7d87f36..60f3405 100644
--- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
+++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
@@ -18,14 +18,17 @@
 
 package org.apache.streams.example;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,48 +37,48 @@
  *
  * Converts them to activities, and writes them in activity format to Elasticsearch.
  */
-
 public class TwitterHistoryElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
+  public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
 
-    private static final ObjectMapper mapper = new ObjectMapper();
+  private static final ObjectMapper mapper = new ObjectMapper();
 
-    TwitterHistoryElasticsearchConfiguration config;
+  TwitterHistoryElasticsearchConfiguration config;
 
-    public TwitterHistoryElasticsearch() {
-        this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public TwitterHistoryElasticsearch() {
+    this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    }
+  public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
+    this.config = config;
+  }
 
-    public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
-        this.config = config;
-    }
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+    TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
 
-        TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
+    new Thread(history).start();
 
-        new Thread(history).start();
-
-    }
+  }
 
 
-    public void run() {
+  public void run() {
 
-        TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
-        ActivityConverterProcessor converter = new ActivityConverterProcessor();
-        ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
+    TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
+    ActivityConverterProcessor converter = new ActivityConverterProcessor();
+    ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
 
-        StreamBuilder builder = new LocalStreamBuilder(500);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream("provider", provider);
-        builder.addStreamsProcessor("converter", converter, 2, "provider");
-        builder.addStreamsPersistWriter("writer", writer, 1, "converter");
-        builder.start();
-    }
+    builder.newPerpetualStream(TwitterTimelineProvider.class.getCanonicalName(), provider);
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterTimelineProvider.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.start();
+  }
 }
diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
index 07c1d88..0eb022b 100644
--- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
+++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
@@ -26,7 +26,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,62 +44,61 @@
 
 import java.io.File;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
 /**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time.
  */
 public class TwitterHistoryElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
 
-    protected TwitterHistoryElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected TwitterHistoryElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
+  }
 
-    @Test
-    public void testTwitterHistoryElasticsearch() throws Exception {
+  @Test
+  public void testTwitterHistoryElasticsearch() throws Exception {
 
-        TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
+    TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
 
-        stream.run();
+    stream.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+        .setTypes(testConfiguration.getElasticsearch().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
-    }
+    assertNotEquals(count, 0);
+  }
 
 }
diff --git a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
index 1a05e32..81e4903 100644
--- a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
+++ b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
@@ -26,4 +26,4 @@
   index = twitter_history_elasticsearch_it
   type = activity
   forceUseConfig = true
-}
\ No newline at end of file
+}
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
index f1e776a..369ec0b 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
@@ -18,129 +18,129 @@
 
 package org.apache.streams.example;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
 import org.apache.streams.filters.VerbDefinitionDropFilter;
 import org.apache.streams.filters.VerbDefinitionKeepFilter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.apache.streams.twitter.provider.TwitterStreamProvider;
 import org.apache.streams.verbs.ObjectCombination;
 import org.apache.streams.verbs.VerbDefinition;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.elasticsearch.common.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Example stream that populates elasticsearch with activities from twitter userstream in real-time
  */
 public class TwitterUserstreamElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+  public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
 
-    /* this pattern will match any/only deletes */
-    private static VerbDefinition deleteVerbDefinition =
-            new VerbDefinition()
-            .withValue("delete")
-            .withObjects(Lists.newArrayList(new ObjectCombination()));
+  /* this pattern will match any/only deletes */
+  private static VerbDefinition deleteVerbDefinition =
+      new VerbDefinition()
+          .withValue("delete")
+          .withObjects(Lists.newArrayList(new ObjectCombination()));
 
-    TwitterUserstreamElasticsearchConfiguration config;
+  TwitterUserstreamElasticsearchConfiguration config;
 
-    public TwitterUserstreamElasticsearch() {
-        this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public TwitterUserstreamElasticsearch() {
+    this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
 
+  }
+
+  public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
+    this.config = config;
+  }
+
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
+
+    TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+    new Thread(userstream).start();
+
+  }
+
+  @Override
+  public void run() {
+
+    TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+    ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+
+    TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+    ActivityConverterProcessor converter = new ActivityConverterProcessor();
+    VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
+    ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+    VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+    SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
+    ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
+
+    builder.newPerpetualStream(TwitterStreamProvider.class.getCanonicalName(), stream);
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterStreamProvider.class.getCanonicalName());
+    builder.addStreamsProcessor(VerbDefinitionDropFilter.class.getCanonicalName(), noDeletesProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, VerbDefinitionDropFilter.class.getCanonicalName());
+    builder.addStreamsProcessor(VerbDefinitionKeepFilter.class.getCanonicalName(), deleteOnlyProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.addStreamsProcessor(SetDeleteIdProcessor.class.getCanonicalName(), setDeleteIdProcessor, 1, VerbDefinitionKeepFilter.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.class.getCanonicalName(), deleter, 1, SetDeleteIdProcessor.class.getCanonicalName());
+
+    builder.start();
+
+  }
+
+  protected class SetDeleteIdProcessor implements StreamsProcessor {
+
+    public String getId() {
+      return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
     }
 
-    public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
-        this.config = config;
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+      Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+      String id = entry.getId();
+      // replace delete with post in id
+      // ensure ElasticsearchPersistDeleter will remove original post if present
+      id = Strings.replace(id, "delete", "post");
+      entry.setId(id);
+
+      return Lists.newArrayList(entry);
     }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+    @Override
+    public void prepare(Object configurationObject) {
 
-        TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
-        new Thread(userstream).start();
 
     }
 
     @Override
-    public void run() {
-
-        TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
-
-        TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
-        ActivityConverterProcessor converter = new ActivityConverterProcessor();
-        VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
-        ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
-        VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
-        SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
-        ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
-
-        builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
-        builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
-        builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
-        builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
-        builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
-        builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
-
-        builder.start();
+    public void cleanUp() {
 
     }
-
-    protected class SetDeleteIdProcessor implements StreamsProcessor {
-
-        public String getId() {
-            return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
-        }
-
-        @Override
-        public List<StreamsDatum> process(StreamsDatum entry) {
-
-            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-            String id = entry.getId();
-            // replace delete with post in id
-            // ensure ElasticsearchPersistDeleter will remove original post if present
-            id = Strings.replace(id, "delete", "post");
-            entry.setId(id);
-
-            return Lists.newArrayList(entry);
-        }
-
-        @Override
-        public void prepare(Object configurationObject) {
-
-
-        }
-
-        @Override
-        public void cleanUp() {
-
-        }
-    }
+  }
 
 }
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
index 2fd26db..63dd8de 100644
--- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
@@ -26,7 +26,6 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,7 +44,6 @@
 
 import java.io.File;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -54,55 +52,55 @@
  */
 public class TwitterUserstreamElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
 
-    protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
 
-    }
+  }
 
-    @Test
-    public void testUserstreamElasticsearch() throws Exception {
+  @Test
+  public void testUserstreamElasticsearch() throws Exception {
 
-        TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+    TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
 
-        Thread thread = new Thread(stream);
-        thread.start();
-        thread.join(30000);
+    Thread thread = new Thread(stream);
+    thread.start();
+    thread.join(30000);
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+        .setTypes(testConfiguration.getElasticsearch().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
-    }
+    assertNotEquals(count, 0);
+  }
 }
diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
index df9be4d..bca2d51 100644
--- a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
+++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
@@ -26,4 +26,5 @@
   index = twitter_userstream_elasticsearch_it
   type = activity
   forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e369e36..384d71a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,10 +263,88 @@
     </dependencyManagement>
 
     <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-remote-resources-plugin</artifactId>
+            </plugin>
+        </plugins>
         <pluginManagement>
             <plugins>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-checkstyle-plugin</artifactId>
+                    <version>${checkstyle.plugin.version}</version>
+                    <dependencies>
+                        <dependency>
+                            <groupId>com.puppycrawl.tools</groupId>
+                            <artifactId>checkstyle</artifactId>
+                            <version>7.2</version>
+                        </dependency>
+                    </dependencies>
+                    <executions>
+                        <execution>
+                            <id>validate</id>
+                            <phase>validate</phase>
+                            <configuration>
+                                <configLocation>http://streams.incubator.apache.org/site/${project.version}/streams-master/streams-java-checkstyle.xml</configLocation>
+                                <encoding>UTF-8</encoding>
+                                <consoleOutput>true</consoleOutput>
+                                <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                <testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
+                                <failsOnError>false</failsOnError>
+                            </configuration>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.scalastyle</groupId>
+                    <artifactId>scalastyle-maven-plugin</artifactId>
+                    <version>${scalastyle.plugin.version}</version>
+                    <executions>
+                        <execution>
+                            <id>validate</id>
+                            <phase>validate</phase>
+                            <configuration>
+                                <verbose>false</verbose>
+                                <failOnViolation>false</failOnViolation>
+                                <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                <failOnWarning>false</failOnWarning>
+                                <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
+                                <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
+                                <!--<configLocation>https://raw.githubusercontent.com/databricks/sbt-databricks/master/scalastyle-config.xml</configLocation>-->
+                                <outputFile>${project.build.directory}/scalastyle-output.xml</outputFile>
+                                <outputEncoding>UTF-8</outputEncoding>
+                            </configuration>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
                     <version>${shade.plugin.version}</version>
                     <configuration>