blob: 9298bfe916f5ed06fc8f38e4bd0ba8a789cea8fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples
import org.apache.beam.examples.kotlin.WindowedWordCount
import org.apache.beam.examples.kotlin.common.ExampleUtils
import org.apache.beam.examples.kotlin.common.WriteOneFilePerWindow.PerWindowFiles
import org.apache.beam.sdk.PipelineResult
import org.apache.beam.sdk.io.FileBasedSink
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.StreamingOptions
import org.apache.beam.sdk.testing.SerializableMatcher
import org.apache.beam.sdk.testing.StreamingIT
import org.apache.beam.sdk.testing.TestPipeline
import org.apache.beam.sdk.testing.TestPipelineOptions
import org.apache.beam.sdk.transforms.windowing.IntervalWindow
import org.apache.beam.sdk.util.*
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Description
import org.hamcrest.Matchers.equalTo
import org.hamcrest.TypeSafeMatcher
import org.joda.time.Duration
import org.joda.time.Instant
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
import org.junit.experimental.categories.Category
import org.junit.rules.TestName
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import java.util.*
import java.util.concurrent.ThreadLocalRandom
/** End-to-end integration test of [WindowedWordCount]. */
@RunWith(JUnit4::class)
class WindowedWordCountITKotlin {
@Rule
var testName = TestName()
/** Options for the [WindowedWordCount] Integration Test. */
interface WindowedWordCountITOptions : WindowedWordCount.Options, TestPipelineOptions, StreamingOptions
@Test
@Throws(Exception::class)
fun testWindowedWordCountInBatchDynamicSharding() {
val options = batchOptions()
// This is the default value, but make it explicit.
options.numShards = null
testWindowedWordCountPipeline(options)
}
@Test
@Throws(Exception::class)
fun testWindowedWordCountInBatchStaticSharding() {
val options = batchOptions()
options.numShards = 3
testWindowedWordCountPipeline(options)
}
// TODO: add a test with streaming and dynamic sharding after resolving
// https://issues.apache.org/jira/browse/BEAM-1438
@Test
@Category(StreamingIT::class)
@Throws(Exception::class)
fun testWindowedWordCountInStreamingStaticSharding() {
val options = streamingOptions()
options.numShards = 3
testWindowedWordCountPipeline(options)
}
@Throws(Exception::class)
private fun defaultOptions(): WindowedWordCountITOptions {
val options = TestPipeline.testingPipelineOptions().`as`(WindowedWordCountITOptions::class.java)
options.inputFile = DEFAULT_INPUT
options.testTimeoutSeconds = 1200L
options.minTimestampMillis = 0L
options.minTimestampMillis = Duration.standardHours(1).millis
options.windowSize = 10
options.output = FileSystems.matchNewResource(options.tempRoot, true)
.resolve(
String.format(
"WindowedWordCountITKotlin.%s-%tFT%<tH:%<tM:%<tS.%<tL+%s",
testName.methodName, Date(), ThreadLocalRandom.current().nextInt()),
StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve("results", StandardResolveOptions.RESOLVE_FILE)
.toString()
return options
}
@Throws(Exception::class)
private fun streamingOptions(): WindowedWordCountITOptions {
val options = defaultOptions()
options.isStreaming = true
return options
}
@Throws(Exception::class)
private fun batchOptions(): WindowedWordCountITOptions {
val options = defaultOptions()
// This is the default value, but make it explicit
options.isStreaming = false
return options
}
@Throws(Exception::class)
private fun testWindowedWordCountPipeline(options: WindowedWordCountITOptions) {
val output = FileBasedSink.convertToFileResourceIfPossible(options.output)
val filenamePolicy = PerWindowFiles(output)
val expectedOutputFiles = Lists.newArrayListWithCapacity<ShardedFile>(6)
for (startMinute in ImmutableList.of(0, 10, 20, 30, 40, 50)) {
val windowStart = Instant(options.minTimestampMillis).plus(Duration.standardMinutes(startMinute.toLong()))
val filePrefix = filenamePolicy.filenamePrefixForWindow(
IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))))
expectedOutputFiles.add(
NumberedShardedFile(
output
.currentDirectory
.resolve(filePrefix, StandardResolveOptions.RESOLVE_FILE)
.toString() + "*"))
}
val inputFile = ExplicitShardedFile(setOf(options.inputFile))
// For this integration test, input is tiny and we can build the expected counts
val expectedWordCounts = TreeMap<String, Long>()
for (line in inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
val words = line.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
for (word in words) {
if (word.isNotEmpty()) {
expectedWordCounts[word] = MoreObjects.firstNonNull(expectedWordCounts[word], 0L) + 1L
}
}
}
options.onSuccessMatcher = WordCountsMatcher(expectedWordCounts, expectedOutputFiles)
WindowedWordCount.runWindowedWordCount(options)
}
/**
* A matcher that bakes in expected word counts, so they can be read directly via some other
* mechanism, and compares a sharded output file with the result.
*/
private class WordCountsMatcher(
private val expectedWordCounts: SortedMap<String, Long>, private val outputFiles: List<ShardedFile>) : TypeSafeMatcher<PipelineResult>(), SerializableMatcher<PipelineResult> {
private var actualCounts: SortedMap<String, Long>? = null
public override fun matchesSafely(pipelineResult: PipelineResult): Boolean {
try {
// Load output data
val outputLines = ArrayList<String>()
for (outputFile in outputFiles) {
outputLines.addAll(
outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()))
}
// Since the windowing is nondeterministic we only check the sums
actualCounts = TreeMap()
for (line in outputLines) {
val splits = line.split(": ".toRegex()).toTypedArray()
val word = splits[0]
val count = java.lang.Long.parseLong(splits[1])
(actualCounts as java.util.Map<String, Long>).merge(word, count) { a, b -> a + b }
}
return actualCounts == expectedWordCounts
} catch (e: Exception) {
throw RuntimeException(
String.format("Failed to read from sharded output: %s due to exception", outputFiles), e)
}
}
override fun describeTo(description: Description) {
equalTo(expectedWordCounts).describeTo(description)
}
public override fun describeMismatchSafely(pResult: PipelineResult, description: Description) {
equalTo(expectedWordCounts).describeMismatch(actualCounts, description)
}
}
companion object {
private const val DEFAULT_INPUT = "gs://apache-beam-samples/shakespeare/sonnets.txt"
private const val MAX_READ_RETRIES = 4
private val DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L)
internal val BACK_OFF_FACTORY = FluentBackoff.DEFAULT
.withInitialBackoff(DEFAULT_SLEEP_DURATION)
.withMaxRetries(MAX_READ_RETRIES)
@BeforeClass
fun setUp() {
PipelineOptionsFactory.register(TestPipelineOptions::class.java)
}
}
}