blob: f325582cbf5480b2f70aaadcf39844a15391c294 [file] [log] [blame]
import static org.apache.beam.gradle.BeamModulePlugin.getSupportedJavaVersion
import org.apache.tools.ant.taskdefs.condition.Os
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def pythonRootDir = "${rootDir}/sdks/python"
def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '')
def latestFlinkVersion = project.ext.latestFlinkVersion
def currentJavaVersion = project.ext.currentJavaVersion
ext {
pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker"
}
def createFlinkRunnerTestTask(String workerType) {
def taskName = "flinkCompatibilityMatrix${workerType}"
// project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar"
def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'flink-runner-test', options)
// Through the Flink job server, we transitively add dependencies on the expansion services needed in tests.
task.configure {
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
// The Java SDK worker is required to execute external transforms.
def suffix = getSupportedJavaVersion()
dependsOn ":sdks:java:container:${suffix}:docker"
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
dependsOn createProcessWorker
}
}
return task
}
createFlinkRunnerTestTask('DOCKER')
createFlinkRunnerTestTask('PROCESS')
createFlinkRunnerTestTask('LOOPBACK')
task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLOOPBACK'
}
// TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit.
tasks.register("flinkTriggerTranscript") {
dependsOn 'setupVirtualenv'
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& pytest \\
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}'
"""
}
}
}
// Verifies BEAM-10702.
tasks.register("portableLocalRunnerJuliaSetWithSetupPy") {
dependsOn 'setupVirtualenv'
dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e . \\
&& cd apache_beam/examples/complete/juliaset \\
&& python juliaset_main.py \\
--runner=PortableRunner \\
--job_endpoint=embed \\
--requirements_file=./requirements.txt \\
--coordinate_output=/tmp/juliaset \\
--grid_size=1
"""
}
}
}
def createProcessWorker = tasks.register("createProcessWorker") {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = """#!/bin/sh
. ${envdir}/bin/activate
exec ${workerScript} "\"\$@\""
"""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
// Requirements file is created during the runtime.
tasks.register("portableLocalRunnerTestWithRequirementsFile") {
dependsOn 'setupVirtualenv'
dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e . \\
&& cd apache_beam/runners/portability \\
&& python requirements_cache_it_test.py \\
--runner=PortableRunner \\
--job_endpoint=embed \\
--environment_type="DOCKER"
"""
}
}
}
def createSamzaRunnerTestTask(String workerType) {
def taskName = "samzaCompatibilityMatrix${workerType}"
def jobServerJar = "${rootDir}/runners/samza/job-server/build/libs/beam-runners-samza-job-server-${version}.jar"
def options = "--samza_job_server_jar=${jobServerJar} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'samza-runner-test', options)
task.configure {
dependsOn ":runners:samza:job-server:shadowJar"
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
dependsOn createProcessWorker
}
}
return task
}
createSamzaRunnerTestTask('DOCKER')
createSamzaRunnerTestTask('PROCESS')
createSamzaRunnerTestTask('LOOPBACK')
task samzaValidatesRunner() {
dependsOn 'samzaCompatibilityMatrixLOOPBACK'
}
def createSparkRunnerTestTask(String workerType) {
def taskName = "sparkCompatibilityMatrix${workerType}"
// `project(':runners:spark:3:job-server').shadowJar.archivePath` is not resolvable until runtime, so hard-code it here.
def jobServerJar = "${rootDir}/runners/spark/3/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar"
def options = "--spark_job_server_jar=${jobServerJar} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'spark-runner-test', options)
task.configure {
dependsOn ':runners:spark:3:job-server:shadowJar'
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
dependsOn createProcessWorker
}
}
return task
}
createSparkRunnerTestTask('DOCKER')
createSparkRunnerTestTask('PROCESS')
createSparkRunnerTestTask('LOOPBACK')
tasks.register("sparkValidatesRunner") {
dependsOn 'sparkCompatibilityMatrixLOOPBACK'
}
def createPrismRunnerTestTask(String workerType) {
def taskName = "prismCompatibilityMatrix${workerType}"
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
def options = "--prism_bin=${prismBin} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'prism-runner-test', options)
task.configure {
dependsOn ":runners:prism:build"
// The Java SDK worker is required to execute external transforms.
def suffix = getSupportedJavaVersion()
dependsOn ":sdks:java:container:${suffix}:docker"
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
dependsOn createProcessWorker
}
}
return task
}
createPrismRunnerTestTask('DOCKER')
createPrismRunnerTestTask('PROCESS')
createPrismRunnerTestTask('LOOPBACK')
tasks.register("prismValidatesRunner") {
dependsOn 'prismCompatibilityMatrixLOOPBACK'
}
tasks.register("prismTriggerTranscript") {
dependsOn 'setupVirtualenv'
dependsOn ':runners:prism:build'
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& pytest \\
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=PrismRunner --environment_type=LOOPBACK --prism_location=${prismBin}'
"""
}
}
}
project.tasks.register("preCommitPy${pythonVersionSuffix}") {
dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
'portableWordCountFlinkRunnerBatch',
'portableWordCountFlinkRunnerStreaming']
}
project.tasks.register("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:3:job-server:shadowJar',
'portableLocalRunnerJuliaSetWithSetupPy',
'portableWordCountSparkRunnerBatch',
'portableLocalRunnerTestWithRequirementsFile'
]
}
project.tasks.register("flinkExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
]
doLast {
def testOpts = [
"--log-cli-level=INFO",
]
def flink_conf_dir = "${rootDir}/runners/flink/src/test/resources/"
def pipelineOpts = [
"--runner=FlinkRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--flink_conf_dir=${flink_conf_dir}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
'{\\"org.apache.flink.runtime\\":\\"WARN\\",' +
// suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult
'\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\"}'
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitExamples-flink-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
"collect": "examples_postcommit and not sickbay_flink"
])
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}
project.tasks.register("sparkExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
':runners:spark:3:job-server:shadowJar'
]
doLast {
def testOpts = [
"--log-cli-level=INFO",
]
def jobServerJar = "${rootDir}/runners/spark/3/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar"
def pipelineOpts = [
"--runner=SparkRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--spark_job_server_jar=${jobServerJar}",
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitExamples-spark-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
"collect": "examples_postcommit and not sickbay_spark"
])
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}
project.tasks.register("prismExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
':runners:prism:build',
]
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
doLast {
def testOpts = [
"--log-cli-level=INFO",
]
def pipelineOpts = [
"--runner=PrismRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--prism_location=${prismBin}",
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitExamples-prism-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
"collect": "examples_postcommit and not sickbay_prism"
])
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
String fork_java_home = null
String fork_java_version = currentJavaVersion
// DebeziumIO needs Java17+
if (JavaVersion.current() < JavaVersion.VERSION_17) {
if (project.hasProperty("java17Home")) {
fork_java_version = 'java17'
fork_java_home = project.getProperty("java17Home")
} else if (project.hasProperty("java21Home")) {
fork_java_version = 'java21'
fork_java_home = project.getProperty("java21Home")
}
}
dependsOn = [
'setupVirtualenv',
'installGcpTest',
":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
":sdks:java:container:${fork_java_version}:docker",
':sdks:java:testing:kafka-service:buildTestKafkaServiceJar',
':sdks:java:io:expansion-service:shadowJar',
':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
':sdks:java:io:amazon-web-services2:expansion-service:shadowJar',
':sdks:java:extensions:schemaio-expansion-service:shadowJar',
':sdks:java:io:debezium:expansion-service:shadowJar'
]
doLast {
def tests = [
"apache_beam/io/gcp/bigquery_read_it_test.py",
"apache_beam/io/external/xlang_kafkaio_it_test.py",
"apache_beam/io/external/xlang_kinesisio_it_test.py",
"apache_beam/io/external/xlang_debeziumio_it_test.py",
]
def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
def flink_conf_dir = "${rootDir}/runners/flink/src/test/resources/"
def pipelineOpts = [
"--runner=FlinkRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--flink_conf_dir=${flink_conf_dir}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
'{\\"org.apache.flink.runtime\\":\\"WARN\\",' +
// suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult
'\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\",' +
// suppress metric name collision warning logs
'\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}'
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
])
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
exec {
if (fork_java_home != null) {
environment "JAVA_HOME", fork_java_home
}
environment "LOCAL_KAFKA_JAR", kafkaJar
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}
project.tasks.register("xlangSpannerIOIT") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
":sdks:java:container:${currentJavaVersion}:docker",
':sdks:java:io:expansion-service:shadowJar',
':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
':sdks:java:io:amazon-web-services2:expansion-service:shadowJar',
':sdks:java:extensions:schemaio-expansion-service:shadowJar',
':sdks:java:io:debezium:expansion-service:shadowJar'
]
doLast {
def tests = [
"apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
]
def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
def pipelineOpts = [
"--runner=FlinkRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
'{\\"org.apache.flink.runtime\\":\\"WARN\\",' +
// suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult
'\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\",' +
// suppress metric name collision warning logs
'\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}'
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
])
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}
def addTestJavaJarCreator(String runner, Task jobServerJarTask) {
project.tasks.register("testJavaJarCreator${runner}") {
dependsOn jobServerJarTask
dependsOn pythonContainerTask
doLast{
exec {
executable "sh"
def options = [
"--runner ${runner}",
"--job_server_jar ${jobServerJarTask.archivePath}",
"--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
"--python_root_dir ${project.rootDir}/sdks/python",
"--python_version ${project.ext.pythonVersion}",
"--python_container_image ${project.docker_image_default_repo_root}/"
+ "${project.docker_image_default_repo_prefix}"
+ "python${project.ext.pythonVersion}_sdk:${project.sdk_version}",
]
args "-c", "${project.rootDir}/runners/portability/test_pipeline_jar.sh ${options.join(' ')}"
}
}
}
}
// TODO(BEAM-11333) Update and test multiple Flink versions.
addTestJavaJarCreator("FlinkRunner", tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar"))
addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:3:job-server:shadowJar"))
def addTestFlinkUberJar(boolean saveMainSession) {
project.tasks.register("testUberJarFlinkRunner${saveMainSession ? 'SaveMainSession' : ''}") {
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:miniCluster"
dependsOn pythonContainerTask
doLast{
exec {
executable "sh"
def options = [
"--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar").archivePath}",
"--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:miniCluster").archivePath}",
"--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
"--python_root_dir ${project.rootDir}/sdks/python",
"--python_version ${project.ext.pythonVersion}",
"--python_container_image ${project.docker_image_default_repo_root}/"
+ "${project.docker_image_default_repo_prefix}"
+ "python${project.ext.pythonVersion}_sdk:${project.sdk_version}",
]
if (saveMainSession) {
options.add('--save_main_session')
}
args "-c", "${project.rootDir}/runners/portability/test_flink_uber_jar.sh ${options.join(' ')}"
}
}
}
}
addTestFlinkUberJar(false)
addTestFlinkUberJar(true)
tasks.register("testPipelineJarSparkRunner") {
dependsOn testJavaJarCreatorSparkRunner
}
tasks.register("testPipelineJarFlinkRunner") {
dependsOn testJavaJarCreatorFlinkRunner
dependsOn testUberJarFlinkRunner
dependsOn testUberJarFlinkRunnerSaveMainSession
}