blob: c89974cb6ea541577bc1a6e97e96acfac73472ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.prism',
)
description = "Apache Beam :: Runners :: Prism :: Java"
ext.summary = "Support for executing a pipeline on Prism."
dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:portability:java")
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
compileOnly library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.truth
}
tasks.test {
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty 'prism.buildTarget', prismBuildTask.project.property('buildTarget').toString()
}
// Below is configuration to support running the Java Validates Runner tests.
configurations {
validatesRunner
}
dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_jdk14
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
}
project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":runners:core-java")
def sickbayTests = [
// PortableMetrics doesn't implement "getCommitedOrNull" from Metrics
// Preventing Prism from passing these tests.
// In particular, it doesn't subclass MetricResult with an override, and
// it explicilty passes "false" to commited supported in create.
//
// There is not currently a category for excluding these _only_ in committed mode
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testAllCommittedMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',
// Instead of 42, Prism got 84, which suggests two early panes of 42 are fired.
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
// A regression introduced when we use number of pending elements rather than watermark to determine
// the bundle readiness of a stateless stage.
// Currently, Prism processes a bundle of [100, ..., 1000] when watermark is set to 100,
// and then a second bundle of [1, ... 99] when the watermark is set to +inf.
// As a result, it yields an output of [-999, 1, 1...], where -999 comes from the difference between 1000 and 1.
// According to https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html,
// the stateful dofn with `RequiresTimeSortedInput` annotation should buffer an element until the element's timestamp + allowed_lateness.
// This stateful dofn feature is not yet supported in Prism.
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
// Triggered Side Inputs not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
// Prism doesn't support multiple TestStreams.
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
// GroupIntoBatchesTest tests that fail:
// Wrong number of elements in windows after GroupIntoBatches.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
// Some tests failed when using TestStream with keyed elements.
// https://github.com/apache/beam/issues/36984
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithState',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testMapStateNoReadOnComputeIfAbsentAndPutIfAbsentInsertsElement',
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp',
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampWithProcessingTime',
// Technically these tests "succeed"
// the test is just complaining that an AssertionException isn't a RuntimeException
//
// java.lang.RuntimeException: test error in finalize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInFinishBatch',
// java.lang.RuntimeException: test error in process
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInProcessElement',
// java.lang.RuntimeException: test error in initialize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInStartBatch',
// Only known window fns supported, not general window merging
// Custom window fns not yet implemented in prism.
// https://github.com/apache/beam/issues/31921
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes',
// Possibly a different error being hidden behind the main error.
// org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String
// TODO(https://github.com/apache/beam/issues/29973)
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata',
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',
// Prism isn't handling Java's side input views properly, likely related to triggered side inputs.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
// Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// ava.lang.IllegalArgumentException: Duplicate values for a
'org.apache.beam.sdk.transforms.MapViewTest.testMapSideInputWithNullValuesCatchesDuplicates',
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view....
'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput',
// Prism side encoding error.
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
// Filtered by PortableRunner tests.
// Teardown not called in exceptions
// https://github.com/apache/beam/issues/20372
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
]
/**
* Runs Java ValidatesRunner tests against the Prism Runner
* with the specified environment type.
*/
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
classpath = configurations.validatesRunner
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--defaultEnvironmentType=${environmentType}",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}",
"--enableWebUI=false",
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
// Not yet supported in Prism.
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics'
}
filter {
for (String test : sickbayTests) {
excludeTestsMatching test
}
}
}
return vrTask
}
tasks.register("validatesRunnerSickbay", Test) {
group = "Verification"
description "Validates Prism local runner (Sickbay Tests)"
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--enableWebUI=false",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}"
])
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
filter {
for (String test : sickbayTests) {
includeTestsMatching test
}
}
}
task prismDockerValidatesRunner {
Task vrTask = createPrismValidatesRunnerTask("prismDockerValidatesRunnerTests", "DOCKER")
vrTask.dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker"
}
task prismLoopbackValidatesRunner {
dependsOn createPrismValidatesRunnerTask("prismLoopbackValidatesRunnerTests", "LOOPBACK")
}