all five flink examples passing
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 2d35035..4cf0b89 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -448,16 +448,24 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.12.4</version>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
+ <configuration>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*ITs.java</include>
+ </includes>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-testng</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
</plugins>
</build>
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2fd9336..a20078e 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -126,7 +126,7 @@
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterFollowingPipeline")
+ env.execute(STREAMS_ID)
}
class FollowingCollectorFlatMapFunction(
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index beea973..bb7d54c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -134,7 +134,7 @@
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterPostsPipeline")
+ env.execute(STREAMS_ID)
}
class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index b615806..d6ed3df 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -1,10 +1,12 @@
package org.apache.streams.examples.flink.twitter.collection
+import java.io.Serializable
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.StoppableFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
@@ -21,6 +23,7 @@
import org.apache.streams.twitter.provider.TwitterStreamProvider
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.scala._
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import scala.collection.JavaConversions._
@@ -82,6 +85,8 @@
import FlinkTwitterSpritzerPipeline._
+ val spritzerSource = new SpritzerSource(config.getTwitter)
+
override def run(): Unit = {
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
@@ -91,7 +96,7 @@
val outPath = buildWriterPath(config.getDestination)
- val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+ val streamSource : DataStream[String] = env.addSource(spritzerSource);
if( config.getTest == false )
streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
@@ -101,15 +106,23 @@
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterPostsPipeline")
+ env.execute(STREAMS_ID)
+
}
- class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+ def stop(): Unit = {
+ spritzerSource.stop()
+ }
+
+ class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+
+ var mapper: ObjectMapper = _
var twitProvider: TwitterStreamProvider = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
+ mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
twitProvider = new TwitterStreamProvider( sourceConfig )
twitProvider.prepare(twitProvider)
twitProvider.startStream()
@@ -120,17 +133,16 @@
do {
Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
iterator = twitProvider.readCurrent().iterator()
- iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+ iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument)))
} while( twitProvider.isRunning )
}
override def cancel(): Unit = {
- twitProvider.cleanUp()
+ close()
}
- @throws[Exception]
- override def close(): Unit = {
- twitProvider.cleanUp()
+ override def stop(): Unit = {
+ close();
}
}
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 867255d..ad0315a 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -128,7 +128,7 @@
LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
- env.execute("FlinkTwitterUserInformationPipeline")
+ env.execute(STREAMS_ID)
}
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
index 259fe7f..1e59039 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -24,17 +24,17 @@
Run (Local):
------------
- java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
Run (Flink):
------------
- flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file>
Run (YARN):
-----------
- flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file>
[JavaDocs](apidocs/index.html "JavaDocs")
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 0f15603..24783be 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -16,11 +16,13 @@
Streams:
--------
-<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
-<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+<a href="FlinkTwitterSpritzerPipeline.html" target="_self">FlinkTwitterSpritzerPipeline</a>
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
Test:
-----
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterFollowingProvider -> source [dir=back,style=dashed];
+ TwitterFollowingProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterTimelineProvider -> source [dir=back,style=dashed];
+ TwitterTimelineProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+ //stream
+ TwitterStreamProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterUserInformationProvider -> source [dir=back,style=dashed];
+ TwitterUserInformationProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 87057be..3e922ab 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -10,7 +10,11 @@
path = "target/test-classes"
writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
}
-twitter.endpoint = friends
+twitter {
+ endpoint = followers
+ ids_only = true
+ max_items = 5000
+}
providerWaitMs = 1000
local = true
test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index b5212ed..038a8dc 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -10,7 +10,10 @@
path = "target/test-classes"
writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
}
-twitter.endpoint = friends
+twitter {
+ endpoint = friends
+ ids_only = true
+}
providerWaitMs = 1000
local = true
test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
new file mode 100644
index 0000000..fec4769
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
@@ -0,0 +1,15 @@
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterSpritzerPipelineIT"
+}
+twitter {
+ endpoint = sample
+ track = [
+ "data"
+ ]
+}
+providerWaitMs = 1000
+local = true
+test = true
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index 342a850..d3663fe 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -2,7 +2,7 @@
fields = ["ID"]
scheme = file
path = "target/test-classes"
- readerPath = "asf.txt"
+ readerPath = "1000twitterids.txt"
}
destination {
fields = ["DOC"]
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
new file mode 100644
index 0000000..f38ad92
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -0,0 +1,55 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
+
+ import FlinkTwitterFollowingPipeline._
+
+ @Test
+ def flinkTwitterFollowersPipelineFollowersIT = {
+
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
+
+ val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+ assert(
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+ > 4000)
+ }
+
+ }
+
+}
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
similarity index 63%
rename from flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
rename to flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index e6294f6..464e743 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -9,6 +9,7 @@
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.junit.Ignore
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
@@ -21,44 +22,16 @@
/**
* Created by sblackmon on 3/13/16.
*/
-class FlinkTwitterFollowingPipelineIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
- private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
import FlinkTwitterFollowingPipeline._
- @Test(enabled = false)
+ @Test
def flinkTwitterFollowersPipelineFriendsIT = {
val reference: Config = ConfigFactory.load()
- val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
- assert(conf_file.exists())
- val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
- val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
- val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
- setup(testConfig)
-
- val job = new FlinkTwitterFollowingPipeline(config = testConfig)
- val jobThread = new Thread(job)
- jobThread.start
- jobThread.join
-
- eventually (timeout(60 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- > 90)
- }
-
- }
-
- @Test(enabled = false)
- def flinkTwitterFollowersPipelineFollowersIT = {
-
- val reference: Config = ConfigFactory.load()
val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
assert(conf_file.exists())
val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
@@ -78,7 +51,7 @@
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- > 500)
+ > 90)
}
}
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index f083f65..2e2e9b1 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -26,7 +26,7 @@
import FlinkTwitterSpritzerPipeline._
- @Test(enabled = false)
+ @Test
def flinkTwitterSpritzerPipelineIT = {
val reference: Config = ConfigFactory.load()
@@ -43,13 +43,14 @@
val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
- jobThread.join
+ jobThread.join(30000)
+ job.stop()
- eventually (timeout(30 seconds), interval(1 seconds)) {
+ eventually (timeout(60 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- >= 200)
+ >= 10)
}
}