blob: bf200310398e576a7a9aadfcbb8d0795937643fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.dataflow',
classesTriggerCheckerBugs: [
'PrimitiveParDoSingleFactory': 'https://github.com/typetools/checker-framework/issues/3791',
// TODO(https://github.com/apache/beam/issues/21068): This currently crashes with checkerframework 3.10.0
// when compiling :runners:google-cloud-dataflow-java:compileJava with:
// message: class file for com.google.api.services.bigquery.model.TableRow not found
// ; The Checker Framework crashed. Please report the crash.
// Compilation unit: /usr/local/google/home/lcwik/git/beam/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
// Last visited tree at line 57 column 1:
// @AutoService(CoderCloudObjectTranslatorRegistrar.class)
// Exception: com.sun.tools.javac.code.Symbol$CompletionFailure: class file for com.google.api.services.bigquery.model.TableRow not found; com.sun.tools.javac.code.Symbol$CompletionFailure: class file for com.google.api.services.bigquery.model.TableRow not found
'DefaultCoderCloudObjectTranslatorRegistrar': 'TODO(https://github.com/apache/beam/issues/21068): Report the crash if still occurring on newest version',
],
)
description = "Apache Beam :: Runners :: Google Cloud Dataflow"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference parameters such as "sourceSets.test.output" directly.
*/
evaluationDependsOn(":sdks:java:io:google-cloud-platform")
evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":examples:java")
evaluationDependsOn(":runners:google-cloud-dataflow-java:worker")
evaluationDependsOn(":sdks:java:container:java8")
evaluationDependsOn(":sdks:java:container:java11")
processResources {
filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
'dataflow.legacy_environment_major_version' : '8',
'dataflow.fnapi_environment_major_version' : '8',
'dataflow.legacy_container_version' : 'beam-master-20230112',
'dataflow.fnapi_container_version' : 'beam-master-20230112',
'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
]
}
// Exclude tests that need a runner
test {
systemProperty "beamTestPipelineOptions", ""
systemProperty "beamUseDummyRunner", "true"
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
}
configurations {
validatesRunner
coreSDKJavaIntegrationTest
examplesJavaIntegrationTest
googleCloudPlatformIntegrationTest
}
dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":runners:core-construction-java")
implementation library.java.avro
implementation library.java.bigdataoss_util
implementation library.java.commons_codec
// Avoids conflicts with bigdataoss_util (BEAM-11010)
implementation library.java.flogger_system_backend
permitUnusedDeclared library.java.flogger_system_backend
implementation library.java.google_api_client
// Ensures SequencedMessage availability for Spotless
implementation library.java.proto_google_cloud_pubsublite_v1
permitUnusedDeclared library.java.proto_google_cloud_pubsublite_v1
implementation library.java.google_api_services_clouddebugger
implementation library.java.google_api_services_dataflow
implementation library.java.google_api_services_storage
permitUnusedDeclared library.java.google_api_services_storage // BEAM-11761
implementation library.java.google_auth_library_credentials
implementation library.java.google_auth_library_oauth2_http
implementation library.java.google_http_client
implementation library.java.google_http_client_jackson2
permitUnusedDeclared library.java.google_http_client_jackson2 // BEAM-11761
implementation library.java.hamcrest
implementation library.java.jackson_annotations
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_grpc_1_48_1
testImplementation library.java.guava_testlib
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:core-construction-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:python", configuration: "testRuntimeMigration")
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_core
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
validatesRunner library.java.hamcrest
coreSDKJavaIntegrationTest project(project.path)
coreSDKJavaIntegrationTest project(path: ":sdks:java:core", configuration: "shadowTest")
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration")
googleCloudPlatformIntegrationTest project(project.path)
googleCloudPlatformIntegrationTest project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntimeMigration")
}
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowRegion = project.findProperty('dataflowRegion') ?: 'us-central1'
def dataflowValidatesTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-validates-runner-tests'
def dataflowPostCommitTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowPostCommitTempRootKms = project.findProperty('dataflowTempRootKms') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
def dataflowUploadTemp = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-upload-tests'
def testFilesToStage = project.findProperty('filesToStage') ?: 'test.txt'
def dataflowLegacyWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
def dockerImageRoot = project.findProperty('dockerImageRoot') ?: "us.gcr.io/${dataflowProject}/java-postcommit-it"
def dockerJavaImageContainer = "${dockerImageRoot}/java"
def dockerPythonImageContainer = "${dockerImageRoot}/python"
def dockerTag = new Date().format('yyyyMMddHHmmss')
ext.dockerJavaImageName = "${dockerJavaImageContainer}:${dockerTag}"
ext.dockerPythonImageName = "${dockerPythonImageContainer}:${dockerTag}"
def legacyPipelineOptions = [
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
]
def runnerV2PipelineOptions = [
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--sdkContainerImage=${dockerJavaImageContainer}:${dockerTag}",
"--experiments=use_unified_worker,use_runner_v2",
"--firestoreDb=${firestoreDb}",
]
def commonLegacyExcludeCategories = [
// Should be run only in a properly configured SDK harness environment
'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment',
'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
'org.apache.beam.sdk.testing.UsesJavaExpansionService',
'org.apache.beam.sdk.testing.UsesPythonExpansionService',
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
]
def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesOrderedListState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
]
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
// make Dataflow pick up the non-versioned container image, which handles a staged worker jar.
def createLegacyWorkerValidatesRunnerTest = { Map args ->
def name = args.name
def pipelineOptions = args.pipelineOptions ?: legacyPipelineOptions
def excludedTests = args.excludedTests ?: []
def excludedCategories = args.excludedCategories ?: []
return tasks.create(name: name, type: Test, group: "Verification") {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonLegacyExcludeCategories.each {
excludeCategories it
}
excludedCategories.each {
excludeCategories it
}
filter {
excludedTests.each {
excludeTestsMatching it
}
}
}
}
}
def createRunnerV2ValidatesRunnerTest = { Map args ->
def name = args.name
def pipelineOptions = args.pipelineOptions ?: runnerV2PipelineOptions
def excludedTests = args.excludedTests ?: []
def excludedCategories = args.excludedCategories ?: []
return tasks.create(name: name, type: Test, group: "Verification") {
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonRunnerV2ExcludeCategories.each {
excludeCategories it
}
excludedCategories.each {
excludeCategories it
}
filter {
excludedTests.each {
excludeTestsMatching it
}
}
}
}
}
// Push docker images to a container registry for use within tests.
// NB: Tasks which consume docker images from the registry should depend on this
// task directly ('dependsOn buildAndPushDockerJavaContainer'). This ensures the correct
// task ordering such that the registry doesn't get cleaned up prior to task completion.
def buildAndPushDockerJavaContainer = tasks.register("buildAndPushDockerJavaContainer") {
def javaVer = "java8"
if(project.hasProperty('compileAndRunTestsWithJava17')) {
javaVer = "java17"
} else if(project.hasProperty('compileAndRunTestsWithJava11')) {
javaVer = "java11"
}
dependsOn ":sdks:java:container:${javaVer}:docker"
def defaultDockerImageName = containerImageName(
name: "${project.docker_image_default_repo_prefix}${javaVer}_sdk",
root: "apache",
tag: project.sdk_version)
doLast {
exec {
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerJavaImageName}"
}
exec {
commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}"
}
}
}
// Clean up built Java images
def cleanUpDockerJavaImages = tasks.register("cleanUpDockerJavaImages") {
doLast {
exec {
commandLine "docker", "rmi", "--force", "${dockerJavaImageName}"
}
exec {
commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerJavaImageName}"
}
exec {
commandLine "./scripts/cleanup_untagged_gcr_images.sh", "${dockerJavaImageContainer}"
}
}
}
// Push docker images to a container registry for use within tests.
// NB: Tasks which consume docker images from the registry should depend on this
// task directly ('dependsOn buildAndPushDockerPythonContainer'). This ensures the correct
// task ordering such that the registry doesn't get cleaned up prior to task completion.
def buildAndPushDockerPythonContainer = tasks.create("buildAndPushDockerPythonContainer") {
if (!project.project(":sdks:python").buildFile.exists()) {
System.err.println 'Python build file not found. Skipping buildAndPushDockerPythonContainer task.'
return
}
project.evaluationDependsOn(":sdks:python")
def pythonVer = project.project(':sdks:python').pythonVersion
dependsOn ":sdks:python:container:py"+pythonVer.replace('.', '')+":docker"
def defaultDockerImageName = containerImageName(
name: "${project.docker_image_default_repo_prefix}python${pythonVer}_sdk",
root: "apache",
tag: project.sdk_version)
doLast {
exec {
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerPythonImageName}"
}
exec {
commandLine "gcloud", "docker", "--", "push", "${dockerPythonImageName}"
}
}
}
// Clean up built Python images
def cleanUpDockerPythonImages = tasks.register("cleanUpDockerPythonImages") {
doLast {
exec {
commandLine "docker", "rmi", "--force", "${dockerPythonImageName}"
}
exec {
commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerPythonImageName}"
}
exec {
commandLine "./scripts/cleanup_untagged_gcr_images.sh", "${dockerPythonImageContainer}"
}
}
}
afterEvaluate {
// Ensure all tasks which use published docker images run before they are cleaned up
tasks.each { t ->
if (t.dependsOn.contains(buildAndPushDockerJavaContainer) && !t.name.equalsIgnoreCase('printRunnerV2PipelineOptions')) {
t.finalizedBy cleanUpDockerJavaImages
}
if (t.dependsOn.contains(buildAndPushDockerPythonContainer)) {
t.finalizedBy cleanUpDockerPythonImages
}
}
}
task printRunnerV2PipelineOptions {
dependsOn buildAndPushDockerJavaContainer
doLast {
println "To run a Dataflow job with runner V2, add the following pipeline options to your command-line:"
println runnerV2PipelineOptions.join(' ')
println "Please delete your image upon completion with the following command:"
println "docker rmi ${dockerJavaImageName}; gcloud container images delete --force-delete-tags ${dockerJavaImageName}"
}
}
task validatesRunner {
group = "Verification"
description "Validates Dataflow runner"
dependsOn(createLegacyWorkerValidatesRunnerTest(
name: 'validatesRunnerLegacyWorkerTest',
excludedTests: [
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
]
))
}
task validatesRunnerStreaming {
group = "Verification"
description "Validates Dataflow runner forcing streaming mode"
dependsOn(createLegacyWorkerValidatesRunnerTest(
name: 'validatesRunnerLegacyWorkerTestStreaming',
pipelineOptions: legacyPipelineOptions + ['--streaming'],
excludedCategories: [
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
'org.apache.beam.sdk.testing.UsesSetState',
],
excludedTests: [
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
// GroupIntoBatches.withShardedKey not supported on streaming runner v1
// https://github.com/apache/beam/issues/22592
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
]
))
}
def setupXVR = tasks.register("setupXVR") {
dependsOn buildAndPushDockerJavaContainer
dependsOn buildAndPushDockerPythonContainer
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
}
def cleanupXVR = tasks.register("cleanupXVR") {
finalizedBy cleanUpDockerJavaImages
finalizedBy cleanUpDockerPythonImages
}
createCrossLanguageValidatesRunnerTask(
startJobServer: setupXVR,
cleanupJobServer: cleanupXVR,
classpath: configurations.validatesRunner,
numParallelTests: Integer.MAX_VALUE,
needsSdkLocation: true,
semiPersistDir: "/var/opt/google",
pythonPipelineOptions: [
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--sdk_harness_container_image_overrides=.*java.*,${dockerJavaImageContainer}:${dockerTag}",
],
javaPipelineOptions: [
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--sdkContainerImage=${dockerJavaImageContainer}:${dockerTag}",
"--sdkHarnessContainerImageOverrides=.*python.*,${dockerPythonImageContainer}:${dockerTag}",
],
pytestOptions: [
"--capture=no",
"--numprocesses=8",
"--timeout=4500",
"--log-cli-level=INFO",
],
goScriptOptions: [
"--runner dataflow",
"--project ${dataflowProject}",
"--dataflow_project ${dataflowProject}",
"--region ${dataflowRegion}",
"--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"",
"--sdk_overrides \".*java.*,${dockerJavaImageContainer}:${dockerTag}\"",
],
)
task validatesRunnerV2 {
group = "Verification"
description = "Runs the ValidatesRunner tests on Dataflow Runner V2"
dependsOn(createRunnerV2ValidatesRunnerTest(
name: 'validatesRunnerV2Test',
pipelineOptions: runnerV2PipelineOptions,
excludedCategories: [
'org.apache.beam.sdk.testing.UsesOnWindowExpiration',
'org.apache.beam.sdk.testing.UsesStatefulParDo',
'org.apache.beam.sdk.testing.UsesTimersInParDo',
'org.apache.beam.sdk.testing.UsesUnboundedPCollections',
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo',
],
excludedTests: [
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testLargeKeys100MB',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testLargeKeys10MB',
// TODO(https://github.com/apache/beam/issues/20931): Identify whether it's bug or a feature gap.
'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner',
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
]
))
}
task validatesRunnerV2Streaming {
group = "Verification"
description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing streaming mode"
dependsOn(createRunnerV2ValidatesRunnerTest(
name: 'validatesRunnerV2TestStreaming',
pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'],
excludedCategories: [
'org.apache.beam.sdk.testing.LargeKeys$Above10KB',
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
],
excludedTests: [
// TestStream only (partially) supported on UW
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
// TestStream with processing time control not supported
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
// TODO(https://github.com/apache/beam/issues/20931): Identify whether it's bug or a feature gap.
'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner',
// TODO(https://github.com/apache/beam/issues/21424)
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnWindowTimestampSkew',
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
]
))
}
task copyGoogleCloudPlatformTestResources(type: Copy) {
from project(':sdks:java:io:google-cloud-platform').fileTree("src/test/resources")
into "$buildDir/resources/test/"
}
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyGoogleCloudPlatformTestResources) {
group = "Verification"
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
"--firestoreDb=${firestoreDb}",
])
include '**/*IT.class'
exclude '**/BigQueryIOReadIT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/PubsubReadIT.class'
exclude '**/FhirIOReadIT.class'
exclude '**/gcp/spanner/changestreams/it/*.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":sdks:java:io:google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
}
}
task googleCloudPlatformLegacyWorkerKmsIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowPostCommitTempRootKms}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
"--dataflowKmsKey=${dataflowKmsKey}",
"--firestoreDb=${firestoreDb}",
])
include '**/*IT.class'
exclude '**/BigQueryKmsKeyIT.class' // Only needs to run on direct runner.
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":sdks:java:io:google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories "org.apache.beam.sdk.testing.UsesKms"
}
}
task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
group = "Verification"
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(runnerV2PipelineOptions)
include '**/*IT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/BigQueryIOStorageWriteIT.class'
exclude '**/SpannerWriteIT.class'
exclude '**/*KmsKeyIT.class'
exclude '**/FhirIOReadIT.class'
exclude '**/FhirIOWriteIT.class'
exclude '**/FhirIOLROIT.class'
exclude '**/FhirIOSearchIT.class'
exclude '**/FhirIOPatientEverythingIT.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":sdks:java:io:google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit { }
}
task examplesJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
// The examples/java preCommit task already covers running WordCountIT/WindowedWordCountIT so
// this postCommit integration test excludes them.
include '**/*IT.class'
exclude '**/WordCountIT.class'
exclude '**/WindowedWordCountIT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
useJUnit { }
}
task examplesJavaRunnerV2IntegrationTest(type: Test) {
group = "Verification"
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(runnerV2PipelineOptions)
// The examples/java preCommit task already covers running WordCountIT/WindowedWordCountIT so
// this postCommit integration test excludes them.
include '**/*IT.class'
exclude '**/WordCountIT.class'
exclude '**/WindowedWordCountIT.class'
exclude '**/TopWikipediaSessionsIT.class'
exclude '**/AutoCompleteIT.class'
// TODO(https://github.com/apache/beam/issues/20593): test times out.
exclude '**/FhirIOReadIT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
useJUnit { }
}
task coreSDKJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
include '**/*IT.class'
// TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way
exclude '**/RequiresStableInputIT.class'
maxParallelForks 4
classpath = configurations.coreSDKJavaIntegrationTest
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit { }
}
task coreSDKJavaRunnerV2IntegrationTest(type: Test) {
group = "Verification"
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(runnerV2PipelineOptions)
include '**/*IT.class'
// TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way
exclude '**/RequiresStableInputIT.class'
maxParallelForks 4
classpath = configurations.coreSDKJavaIntegrationTest
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit { }
}
task postCommit {
group = "Verification"
description = "Various integration tests using the Dataflow runner."
dependsOn googleCloudPlatformLegacyWorkerIntegrationTest
dependsOn googleCloudPlatformLegacyWorkerKmsIntegrationTest
dependsOn examplesJavaLegacyWorkerIntegrationTest
dependsOn coreSDKJavaLegacyWorkerIntegrationTest
}
task postCommitRunnerV2 {
group = "Verification"
description = "Various integration tests using the Dataflow runner V2."
dependsOn googleCloudPlatformRunnerV2IntegrationTest
dependsOn examplesJavaRunnerV2IntegrationTest
dependsOn coreSDKJavaRunnerV2IntegrationTest
}
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpRegion = project.findProperty('gcpRegion') ?: 'us-central1'
def gcsBucket = project.findProperty('gcsBucket') ?: 'temp-storage-for-release-validation-tests/nightly-snapshot-validation'
def bqDataset = project.findProperty('bqDataset') ?: 'beam_postrelease_mobile_gaming'
def pubsubTopic = project.findProperty('pubsubTopic') ?: 'java_mobile_gaming_topic'
// Generates :runners:google-cloud-dataflow-java:runQuickstartJavaDataflow
createJavaExamplesArchetypeValidationTask(type: 'Quickstart',
runner: 'Dataflow',
gcpProject: gcpProject,
gcpRegion: gcpRegion,
gcsBucket: gcsBucket)
// Generates :runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow
createJavaExamplesArchetypeValidationTask(type: 'MobileGaming',
runner: 'Dataflow',
gcpProject: gcpProject,
gcpRegion: gcpRegion,
gcsBucket: gcsBucket,
bqDataset: bqDataset,
pubsubTopic: pubsubTopic)
// Generates :runners:google-cloud-dataflow-java:runMobileGamingJavaDataflowBom
createJavaExamplesArchetypeValidationTask(type: 'MobileGaming',
runner: 'DataflowBom',
gcpProject: gcpProject,
gcpRegion: gcpRegion,
gcsBucket: gcsBucket,
bqDataset: bqDataset,
pubsubTopic: pubsubTopic)
// Standalone task for testing GCS upload, use with -PfilesToStage and -PdataflowTempRoot.
task GCSUpload(type: JavaExec) {
mainClass = 'org.apache.beam.runners.dataflow.util.GCSUploadMain'
classpath = sourceSets.test.runtimeClasspath
args "--stagingLocation=${dataflowUploadTemp}/staging",
"--filesToStage=${testFilesToStage}"
}