diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 2d35035..4cf0b89 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -448,16 +448,24 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.12.4</version>
-                <executions>
-                    <execution>
-                        <id>integration-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <configuration>
+                    <!-- Run integration test suite rather than individual tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*IT.java</include>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-testng</artifactId>
+                        <version>${failsafe.plugin.version}</version>
+                    </dependency>
+                </dependencies>
             </plugin>
         </plugins>
     </build>
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2fd9336..a20078e 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -126,7 +126,7 @@
 
         // if( test == true ) jsons.print();
 
-        env.execute("FlinkTwitterFollowingPipeline")
+        env.execute(STREAMS_ID)
     }
 
     class FollowingCollectorFlatMapFunction(
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index beea973..bb7d54c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -134,7 +134,7 @@
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index b615806..d6ed3df 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -1,10 +1,12 @@
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.StoppableFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -21,6 +23,7 @@
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.api.scala._
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
@@ -82,6 +85,8 @@
 
   import FlinkTwitterSpritzerPipeline._
 
+  val spritzerSource = new SpritzerSource(config.getTwitter)
+
   override def run(): Unit = {
 
     val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
@@ -91,7 +96,7 @@
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+    val streamSource : DataStream[String] = env.addSource(spritzerSource);
 
     if( config.getTest == false )
       streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
@@ -101,15 +106,23 @@
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
+
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+  def stop(): Unit = {
+    spritzerSource.stop()
+  }
+
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+
+    var mapper: ObjectMapper = _
 
     var twitProvider: TwitterStreamProvider = _
 
     @throws[Exception]
     override def open(parameters: Configuration): Unit = {
+      mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
       twitProvider = new TwitterStreamProvider( sourceConfig )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()
@@ -120,17 +133,16 @@
       do {
         Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
         iterator = twitProvider.readCurrent().iterator()
-        iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+        iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument)))
       } while( twitProvider.isRunning )
     }
 
     override def cancel(): Unit = {
-      twitProvider.cleanUp()
+      close()
     }
 
-    @throws[Exception]
-    override def close(): Unit = {
-      twitProvider.cleanUp()
+    override def stop(): Unit = {
+      close();
     }
   }
 
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 867255d..ad0315a 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -128,7 +128,7 @@
 
     LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
-    env.execute("FlinkTwitterUserInformationPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
index 259fe7f..1e59039 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -24,17 +24,17 @@
 Run (Local):
 ------------
 
-    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
 
 Run (Flink):
 ------------
 
-    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> 
 
 Run (YARN):
 -----------
 
-    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> 
 
 [JavaDocs](apidocs/index.html "JavaDocs")
 
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 0f15603..24783be 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -16,11 +16,13 @@
 Streams:
 --------
 
-<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
 
 <a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
 
-<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+<a href="FlinkTwitterSpritzerPipeline.html" target="_self">FlinkTwitterSpritzerPipeline</a>
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
 
 Test:
 -----
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterFollowingProvider -> source [dir=back,style=dashed];
+  TwitterFollowingProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterTimelineProvider -> source [dir=back,style=dashed];
+  TwitterTimelineProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+  //stream
+  TwitterStreamProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterUserInformationProvider -> source [dir=back,style=dashed];
+  TwitterUserInformationProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 87057be..3e922ab 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -10,7 +10,11 @@
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = followers
+  ids_only = true
+  max_items = 5000
+}
 providerWaitMs = 1000
 local = true
 test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index b5212ed..038a8dc 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -10,7 +10,10 @@
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = friends
+  ids_only = true
+}
 providerWaitMs = 1000
 local = true
 test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
new file mode 100644
index 0000000..fec4769
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
@@ -0,0 +1,15 @@
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterSpritzerPipelineIT"
+}
+twitter {
+  endpoint = sample
+  track = [
+    "data"
+  ]
+}
+providerWaitMs = 1000
+local = true
+test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index 342a850..d3663fe 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -2,7 +2,7 @@
   fields = ["ID"]
   scheme = file
   path = "target/test-classes"
-  readerPath = "asf.txt"
+  readerPath = "1000twitterids.txt"
 }
 destination {
   fields = ["DOC"]
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
new file mode 100644
index 0000000..f38ad92
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -0,0 +1,55 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
+
+  import FlinkTwitterFollowingPipeline._
+
+  @Test
+  def flinkTwitterFollowersPipelineFollowersIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          > 4000)
+    }
+
+  }
+
+}
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
similarity index 63%
rename from flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
rename to flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index e6294f6..464e743 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -9,6 +9,7 @@
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
 import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.junit.Ignore
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
@@ -21,44 +22,16 @@
 /**
   * Created by sblackmon on 3/13/16.
   */
-class FlinkTwitterFollowingPipelineIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
 
-  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
 
   import FlinkTwitterFollowingPipeline._
 
-  @Test(enabled = false)
+  @Test
   def flinkTwitterFollowersPipelineFriendsIT = {
 
     val reference: Config = ConfigFactory.load()
-    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
-    assert(conf_file.exists())
-    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
-    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
-    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
-    setup(testConfig)
-
-    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
-    val jobThread = new Thread(job)
-    jobThread.start
-    jobThread.join
-
-    eventually (timeout(60 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 90)
-    }
-
-  }
-
-  @Test(enabled = false)
-  def flinkTwitterFollowersPipelineFollowersIT = {
-
-    val reference: Config = ConfigFactory.load()
     val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
     assert(conf_file.exists())
     val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
@@ -78,7 +51,7 @@
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
         Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 500)
+          > 90)
     }
 
   }
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index f083f65..2e2e9b1 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -26,7 +26,7 @@
 
   import FlinkTwitterSpritzerPipeline._
 
-  @Test(enabled = false)
+  @Test
   def flinkTwitterSpritzerPipelineIT = {
 
     val reference: Config = ConfigFactory.load()
@@ -43,13 +43,14 @@
     val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
-    jobThread.join
+    jobThread.join(30000)
+    job.stop()
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
+    eventually (timeout(60 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
         Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 200)
+          >= 10)
     }
 
   }
