| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import groovy.json.JsonOutput |
| |
| plugins { id 'org.apache.beam.module' } |
| |
| applyJavaNature( |
| automaticModuleName: 'org.apache.beam.runners.prism', |
| ) |
| |
| description = "Apache Beam :: Runners :: Prism :: Java" |
| ext.summary = "Support for executing a pipeline on Prism." |
| |
| dependencies { |
| implementation project(path: ":sdks:java:core", configuration: "shadow") |
| implementation project(":runners:portability:java") |
| |
| implementation library.java.joda_time |
| implementation library.java.slf4j_api |
| implementation library.java.vendored_guava_32_1_2_jre |
| compileOnly library.java.hamcrest |
| |
| testImplementation library.java.junit |
| testImplementation library.java.mockito_core |
| testImplementation library.java.truth |
| } |
| |
| tasks.test { |
| var prismBuildTask = dependsOn(':runners:prism:build') |
| systemProperty 'prism.buildTarget', prismBuildTask.project.property('buildTarget').toString() |
| } |
| |
| // Below is configuration to support running the Java Validates Runner tests. |
| |
| configurations { |
| validatesRunner |
| } |
| |
| dependencies { |
| implementation project(path: ":sdks:java:core", configuration: "shadow") |
| implementation library.java.hamcrest |
| permitUnusedDeclared library.java.hamcrest |
| implementation library.java.joda_time |
| implementation library.java.slf4j_api |
| implementation library.java.vendored_guava_32_1_2_jre |
| |
| testImplementation library.java.hamcrest |
| testImplementation library.java.junit |
| testImplementation library.java.mockito_core |
| testImplementation library.java.slf4j_jdk14 |
| |
| validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") |
| validatesRunner project(path: project.path, configuration: "testRuntimeMigration") |
| } |
| |
| project.evaluationDependsOn(":sdks:java:core") |
| project.evaluationDependsOn(":runners:core-java") |
| |
| def sickbayTests = [ |
| // PortableMetrics doesn't implement "getCommitedOrNull" from Metrics |
| // Preventing Prism from passing these tests. |
| // In particular, it doesn't subclass MetricResult with an override, and |
| // it explicilty passes "false" to commited supported in create. |
| // |
| // There is not currently a category for excluding these _only_ in committed mode |
| 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testAllCommittedMetrics', |
| 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics', |
| 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics', |
| 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics', |
| 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics', |
| |
| // Instead of 42, Prism got 84, which suggests two early panes of 42 are fired. |
| 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', |
| |
| // A regression introduced when we use number of pending elements rather than watermark to determine |
| // the bundle readiness of a stateless stage. |
| // Currently, Prism processes a bundle of [100, ..., 1000] when watermark is set to 100, |
| // and then a second bundle of [1, ... 99] when the watermark is set to +inf. |
| // As a result, it yields an output of [-999, 1, 1...], where -999 comes from the difference between 1000 and 1. |
| // According to https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html, |
| // the stateful dofn with `RequiresTimeSortedInput` annotation should buffer an element until the element's timestamp + allowed_lateness. |
| // This stateful dofn feature is not yet supported in Prism. |
| 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', |
| |
| // Triggered Side Inputs not yet implemented in Prism. |
| // https://github.com/apache/beam/issues/31438 |
| 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', |
| 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', |
| |
| // Prism doesn't support multiple TestStreams. |
| 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', |
| |
| // GroupIntoBatchesTest tests that fail: |
| // Wrong number of elements in windows after GroupIntoBatches. |
| 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode', |
| 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow', |
| 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow', |
| // ShardedKey not yet implemented. |
| 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', |
| |
| // Some tests failed when using TestStream with keyed elements. |
| // https://github.com/apache/beam/issues/36984 |
| 'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithState', |
| 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testMapStateNoReadOnComputeIfAbsentAndPutIfAbsentInsertsElement', |
| 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp', |
| 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampWithProcessingTime', |
| |
| // Technically these tests "succeed" |
| // the test is just complaining that an AssertionException isn't a RuntimeException |
| // |
| // java.lang.RuntimeException: test error in finalize |
| 'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInFinishBatch', |
| // java.lang.RuntimeException: test error in process |
| 'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInProcessElement', |
| // java.lang.RuntimeException: test error in initialize |
| 'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInStartBatch', |
| |
| // Only known window fns supported, not general window merging |
| // Custom window fns not yet implemented in prism. |
| // https://github.com/apache/beam/issues/31921 |
| 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows', |
| 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection', |
| 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes', |
| |
| // Possibly a different error being hidden behind the main error. |
| // org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String |
| // TODO(https://github.com/apache/beam/issues/29973) |
| 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata', |
| // TODO(https://github.com/apache/beam/issues/31231) |
| 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata', |
| |
| // Prism isn't handling Java's side input views properly, likely related to triggered side inputs. |
| // https://github.com/apache/beam/issues/32932 |
| // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. |
| // Consider using Combine.globally().asSingleton() to combine the PCollection into a single value |
| // java.util.NoSuchElementException: Empty PCollection accessed as a singleton view. |
| 'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput', |
| // ava.lang.IllegalArgumentException: Duplicate values for a |
| 'org.apache.beam.sdk.transforms.MapViewTest.testMapSideInputWithNullValuesCatchesDuplicates', |
| // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.... |
| 'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput', |
| // java.util.NoSuchElementException: Empty PCollection accessed as a singleton view. |
| 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput', |
| // Prism side encoding error. |
| // java.lang.IllegalStateException: java.io.EOFException |
| 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', |
| |
| // Missing output due to processing time timer skew. |
| 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', |
| |
| // Filtered by PortableRunner tests. |
| // Teardown not called in exceptions |
| // https://github.com/apache/beam/issues/20372 |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', |
| 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', |
| ] |
| |
| /** |
| * Runs Java ValidatesRunner tests against the Prism Runner |
| * with the specified environment type. |
| */ |
| def createPrismValidatesRunnerTask = { name, environmentType -> |
| Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { |
| description "PrismRunner Java $environmentType ValidatesRunner suite" |
| classpath = configurations.validatesRunner |
| |
| var prismBuildTask = dependsOn(':runners:prism:build') |
| systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ |
| "--runner=TestPrismRunner", |
| "--experiments=beam_fn_api", |
| "--defaultEnvironmentType=${environmentType}", |
| "--prismLogLevel=warn", |
| "--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}", |
| "--enableWebUI=false", |
| ]) |
| testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| |
| // Not supported in Portable Java SDK yet. |
| // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' |
| |
| // Not yet supported in Prism. |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics' |
| } |
| filter { |
| for (String test : sickbayTests) { |
| excludeTestsMatching test |
| } |
| } |
| } |
| return vrTask |
| } |
| |
| tasks.register("validatesRunnerSickbay", Test) { |
| group = "Verification" |
| description "Validates Prism local runner (Sickbay Tests)" |
| |
| var prismBuildTask = dependsOn(':runners:prism:build') |
| systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ |
| "--runner=TestPrismRunner", |
| "--experiments=beam_fn_api", |
| "--enableWebUI=false", |
| "--prismLogLevel=warn", |
| "--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}" |
| ]) |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) |
| |
| filter { |
| for (String test : sickbayTests) { |
| includeTestsMatching test |
| } |
| } |
| } |
| |
| task prismDockerValidatesRunner { |
| Task vrTask = createPrismValidatesRunnerTask("prismDockerValidatesRunnerTests", "DOCKER") |
| vrTask.dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker" |
| } |
| |
| task prismLoopbackValidatesRunner { |
| dependsOn createPrismValidatesRunnerTask("prismLoopbackValidatesRunnerTests", "LOOPBACK") |
| } |