blob: 2e2e9b1781abb62b53c31bb6a00babdbe15004a6 [file] [log] [blame]
package org.apache.streams.examples.flink.twitter.test
import java.io.File
import java.nio.file.{Files, Paths}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
import org.scalatest.FlatSpec
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.{Seconds, Span}
import org.scalatest.time.SpanSugar._
/**
* Created by sblackmon on 3/13/16.
*/
class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
import FlinkTwitterSpritzerPipeline._
@Test
def flinkTwitterSpritzerPipelineIT = {
val reference: Config = ConfigFactory.load()
val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf")
assert(conf_file.exists())
val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
setup(testConfig)
val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
jobThread.join(30000)
job.stop()
eventually (timeout(60 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
>= 10)
}
}
}