blob: b12c2653bc7ef854d02e92320d039c4d5c638bd5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput
plugins { id 'org.apache.beam.module' }
applyJavaNature()
description = "Apache Beam :: Runners :: Google Cloud Dataflow"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference parameters such as "sourceSets.test.output" directly.
*/
evaluationDependsOn(":beam-sdks-java-io-google-cloud-platform")
evaluationDependsOn(":beam-sdks-java-core")
evaluationDependsOn(":beam-examples-java")
evaluationDependsOn(":beam-runners-google-cloud-dataflow-java-legacy-worker")
evaluationDependsOn(":beam-runners-google-cloud-dataflow-java-fn-api-worker")
evaluationDependsOn(":beam-sdks-java-container")
processResources {
filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
'dataflow.legacy_environment_major_version' : '7',
'dataflow.fnapi_environment_major_version' : '7',
'dataflow.container_version' : 'beam-2.13.0'
]
}
// Exclude tests that need a runner
test {
systemProperty "beamTestPipelineOptions", ""
systemProperty "beamUseDummyRunner", "true"
}
configurations {
validatesRunner
coreSDKJavaIntegrationTest
examplesJavaIntegrationTest
googleCloudPlatformIntegrationTest
}
dependencies {
shadow library.java.vendored_guava_20_0
shadow project(path: ":beam-model-pipeline", configuration: "shadow")
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow")
shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")
shadow library.java.vendored_grpc_1_13_1
shadow library.java.google_api_client
shadow library.java.google_http_client
shadow library.java.google_http_client_jackson2
shadow library.java.google_api_services_dataflow
shadow library.java.google_api_services_clouddebugger
shadow library.java.google_api_services_storage
shadow library.java.google_auth_library_credentials
shadow library.java.google_auth_library_oauth2_http
shadow library.java.bigdataoss_util
shadow library.java.avro
shadow library.java.joda_time
shadow library.java.jackson_core
shadow library.java.jackson_annotations
shadow library.java.jackson_databind
shadow library.java.slf4j_api
shadowTest library.java.hamcrest_core
shadowTest library.java.junit
shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadowTest")
shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
shadowTest project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadowTest")
shadowTest library.java.guava_testlib
shadowTest library.java.slf4j_jdk14
shadowTest library.java.mockito_core
shadowTest library.java.google_cloud_dataflow_java_proto_library_all
shadowTest library.java.datastore_v1_protos
shadowTest library.java.jackson_dataformat_yaml
validatesRunner project(path: ":beam-sdks-java-core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "shadow")
validatesRunner library.java.hamcrest_core
validatesRunner library.java.hamcrest_library
coreSDKJavaIntegrationTest project(path: project.path, configuration: "shadow")
coreSDKJavaIntegrationTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
examplesJavaIntegrationTest project(path: project.path, configuration: "shadow")
examplesJavaIntegrationTest project(path: ":beam-examples-java", configuration: "shadowTest")
googleCloudPlatformIntegrationTest project(path: project.path, configuration: "shadow")
googleCloudPlatformIntegrationTest project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadowTest")
}
test {
systemProperties = [ "beamUseDummyRunner" : "true" ]
}
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowValidatesTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-validates-runner-tests'
def dataflowPostCommitTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowUploadTemp = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-upload-tests'
def testFilesToStage = project.findProperty('filesToStage') ?: 'test.txt'
def dataflowLegacyWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
def dataflowFnApiWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-fn-api-worker").shadowJar.archivePath
def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
def dockerImageRoot = project.findProperty('dockerImageRoot') ?: "us.gcr.io/${dataflowProject}/java-postcommit-it"
def dockerImageContainer = "${dockerImageRoot}/java"
def dockerTag = new Date().format('yyyyMMddHHmmss')
// If -PuseExecutableStage is set, the use_executable_stage_bundle_execution wil be enabled.
def fnapiExperiments = project.findProperty('useExecutableStage') ? 'beam_fn_api,use_executable_stage_bundle_execution' : "beam_fn_api"
ext.dockerImageName = "${dockerImageContainer}:${dockerTag}"
def fnApiPipelineOptions = [
"--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
"--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
"--experiments=${fnapiExperiments}",
]
def commonExcludeCategories = [
'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms',
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
'org.apache.beam.sdk.testing.UsesUnboundedPCollections',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
]
def fnApiWorkerExcludeCategories = [
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesCustomWindowMerging',
'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported',
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders',
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesSchema',
]
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
// make Dataflow pick up the non-versioned container image, which handles a staged worker jar.
task validatesRunnerLegacyWorkerTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonExcludeCategories.each {
excludeCategories it
}
}
}
task validatesRunnerLegacyWorkerJava11Test(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
])
systemProperty "java.specification.version", "11"
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonExcludeCategories.each {
excludeCategories it
}
}
}
// Push docker images to a container registry for use within tests.
// NB: Tasks which consume docker images from the registry should depend on this
// task directly ('dependsOn buildAndPushDockerContainer'). This ensures the correct
// task ordering such that the registry doesn't get cleaned up prior to task completion.
task buildAndPushDockerContainer() {
dependsOn ":beam-sdks-java-container:docker"
finalizedBy 'cleanUpDockerImages'
def defaultDockerImageName = containerImageName(name: "java")
doLast {
exec {
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerImageName}"
}
exec {
commandLine "gcloud", "docker", "--", "push", "${dockerImageContainer}"
}
}
}
afterEvaluate {
// Ensure all tasks which use published docker images run before they are cleaned up
tasks.each { t ->
if (t.dependsOn.contains(buildAndPushDockerContainer)) {
cleanUpDockerImages.mustRunAfter t
}
}
}
task printFnApiPipelineOptions {
group = "Help"
description = "Prints to the console extra pipeline options needed to run a Dataflow pipeline using portability"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
doLast {
println "To run a Dataflow job with portability, add the following pipeline options to your command-line:"
println fnApiPipelineOptions.join(' ')
}
}
task validatesRunnerFnApiWorkerTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}"] + fnApiPipelineOptions
)
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonExcludeCategories.each {
excludeCategories it
}
fnApiWorkerExcludeCategories.each {
excludeCategories it
}
}
}
task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
group = "Verification"
description "Validates Dataflow PortabilityApi runner"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
"--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
"--experiments=beam_fn_api,use_executable_stage_bundle_execution"]
)
// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonExcludeCategories.each {
excludeCategories it
}
fnApiWorkerExcludeCategories.each {
excludeCategories it
}
// TODO(BEAM-6233): Support timer and state.
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
}
}
task validatesRunner {
group = "Verification"
description "Validates Dataflow runner"
dependsOn validatesRunnerLegacyWorkerTest
}
task validatesJava11Runner {
group = 'Verification'
description 'Verifies Dataflow runner with Java 11 container harness'
dependsOn validatesRunnerLegacyWorkerJava11Test
}
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
include '**/*IT.class'
exclude '**/BigQueryIOReadIT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/PubsubReadIT.class'
exclude '**/*KmsKeyIT.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-io-google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit { }
}
task googleCloudPlatformLegacyWorkerKmsIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
"--dataflowKmsKey=${dataflowKmsKey}",
])
include '**/*KmsKeyIT.class'
exclude '**/BigQueryKmsKeyIT.class' // Only needs to run on direct runner.
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-io-google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit { }
}
task googleCloudPlatformFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}"] + fnApiPipelineOptions
)
include '**/*IT.class'
exclude '**/BigQueryIOReadIT.class'
exclude '**/BigQueryIOStorageQueryIT.class'
exclude '**/BigQueryIOStorageReadIT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/PubsubReadIT.class'
exclude '**/SpannerReadIT.class'
exclude '**/BigtableReadIT.class'
exclude '**/V1ReadIT.class'
exclude '**/SpannerWriteIT.class'
exclude '**/BigQueryNestedRecordsIT.class'
exclude '**/SplitQueryFnIT.class'
exclude '**/*KmsKeyIT.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-io-google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported'
}
}
task examplesJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
// The examples/java preCommit task already covers running WordCountIT/WindowedWordCountIT so
// this postCommit integration test excludes them.
include '**/*IT.class'
exclude '**/WordCountIT.class'
exclude '**/WindowedWordCountIT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":beam-examples-java").sourceSets.test.output.classesDirs)
useJUnit { }
}
// For fn-api runner, only run the IT can be passed for now.
// Should support more ITs in the future.
task examplesJavaFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}"] + fnApiPipelineOptions
)
// The examples/java preCommit task already covers running WordCountIT/WindowedWordCountIT so
// this postCommit integration test excludes them.
include '**/*IT.class'
exclude '**/WordCountIT.class'
exclude '**/WindowedWordCountIT.class'
exclude '**/TopWikipediaSessionsIT.class'
exclude '**/TfIdfIT.class'
exclude '**/AutoCompleteIT.class'
exclude '**/TrafficMaxLaneFlowIT.class'
exclude '**/TrafficRoutesIT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":beam-examples-java").sourceSets.test.output.classesDirs)
useJUnit { }
}
task coreSDKJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
include '**/*IT.class'
// TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way
exclude '**/RequiresStableInputIT.class'
maxParallelForks 4
classpath = configurations.coreSDKJavaIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit { }
}
task coreSDKJavaFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}"] + fnApiPipelineOptions
)
include '**/*IT.class'
// TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way
exclude '**/RequiresStableInputIT.class'
maxParallelForks 4
classpath = configurations.coreSDKJavaIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit { }
}
task postCommit {
group = "Verification"
description = "Various integration tests using the Dataflow runner."
dependsOn googleCloudPlatformLegacyWorkerIntegrationTest
dependsOn googleCloudPlatformLegacyWorkerKmsIntegrationTest
dependsOn examplesJavaLegacyWorkerIntegrationTest
dependsOn coreSDKJavaLegacyWorkerIntegrationTest
}
task postCommitPortabilityApi {
group = "Verification"
description = "Various integration tests using the Dataflow FnApi runner."
dependsOn buildAndPushDockerContainer
dependsOn googleCloudPlatformFnApiWorkerIntegrationTest
dependsOn examplesJavaFnApiWorkerIntegrationTest
dependsOn coreSDKJavaFnApiWorkerIntegrationTest
}
// Clean up built images
task cleanUpDockerImages() {
doLast {
exec {
commandLine "docker", "rmi", "${dockerImageName}"
}
exec {
commandLine "gcloud", "--quiet", "container", "images", "delete", "--force-delete-tags", "${dockerImageName}"
}
}
}
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcsBucket = project.findProperty('gcsBucket') ?: 'temp-storage-for-release-validation-tests/nightly-snapshot-validation'
def bqDataset = project.findProperty('bqDataset') ?: 'beam_postrelease_mobile_gaming'
def pubsubTopic = project.findProperty('pubsubTopic') ?: 'java_mobile_gaming_topic'
// Generates :beam-runners-google-cloud-dataflow-java:runQuickstartJavaDataflow
createJavaExamplesArchetypeValidationTask(type: 'Quickstart',
runner: 'Dataflow',
gcpProject: gcpProject,
gcsBucket: gcsBucket)
// Generates :beam-runners-google-cloud-dataflow-java:runMobileGamingJavaDataflow
createJavaExamplesArchetypeValidationTask(type: 'MobileGaming',
runner: 'Dataflow',
gcpProject: gcpProject,
gcsBucket: gcsBucket,
bqDataset: bqDataset,
pubsubTopic: pubsubTopic)
// Standalone task for testing GCS upload, use with -PfilesToStage and -PdataflowTempRoot.
task GCSUpload(type: JavaExec) {
main = 'org.apache.beam.runners.dataflow.util.GCSUploadMain'
classpath = sourceSets.test.runtimeClasspath
args "--stagingLocation=${dataflowUploadTemp}/staging",
"--filesToStage=${testFilesToStage}"
}