blob: 6a0777bd667c58733f82c7956512d58eb8657d10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
evaluationDependsOn(':runners:google-cloud-dataflow-java:worker')
evaluationDependsOn(':sdks:python:test-suites:xlang')
enablePythonPerformanceTest()
String pythonVersionNumber = project.ext.pythonVersion.replace('.', '')
String pythonVersionSuffix = project.ext.pythonVersion
? "-py${pythonVersionNumber}"
: ''
// Basic test options for ITs running on Jenkins.
def basicTestOpts = [
"--capture=no", // print stdout instantly
"--numprocesses=8", // run tests in parallel
"--timeout=4500", // timeout of whole command execution
"--color=yes", // console color
"--log-cli-level=INFO" //log level info
]
dependencies {
distTarBall project(path: ":sdks:python", configuration: "distTarBall")
}
task initializeForDataflowJob{
def wheelCompatible = "amd64".equalsIgnoreCase(System.getProperty("os.arch"))
if (!wheelCompatible && project.hasProperty('useWheelDistribution')) {
throw new GradleException('-PuseWheelDistribution is set for the task but the ' +
'host system platform is not compatible with Dataflow worker container image.')
}
dependsOn 'installGcpTest'
if (project.hasProperty('useWheelDistribution')) {
dependsOn ":sdks:python:bdistPy${pythonVersionNumber}linux"
doLast {
def collection = project.fileTree(project.project(':sdks:python').buildDir){
include "**/apache_beam-*cp${pythonVersionNumber}*manylinux*.whl"
}
// sdkLocation ext is set at execution time
String packageFilename = collection.singleFile.toString()
project.ext.sdkLocation = packageFilename
logger.info('Use wheel {} for sdk_location.', packageFilename)
}
} else {
dependsOn ':sdks:python:sdist'
// sdkLocation ext is available at config time
String packageFilename = files(configurations.distTarBall.files).singleFile
project.ext.sdkLocation = packageFilename
logger.info('Use tarball {} for sdk_location.', packageFilename)
}
}
def runScriptsDir = "${rootDir}/sdks/python/scripts"
// Basic test options for ITs running on Jenkins.
def basicPytestOpts = [
"--capture=no", // print stdout instantly
"--timeout=4500", // timeout of whole command execution
"--color=yes", // console color
"--log-cli-level=INFO", //log level
]
def preCommitIT(String runScriptsDir, String envdir, Boolean streaming, String pythonSuffix) {
def suffix = streaming ? "_streaming" : "_batch"
task "preCommitIT${suffix}" {
dependsOn 'initializeForDataflowJob'
doLast {
// Basic integration tests to run in PreCommit
def precommitTests = streaming ? [
"apache_beam/examples/streaming_wordcount_it_test.py::StreamingWordCountIT::test_streaming_wordcount_it",
] : [
"apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
]
def testOpts = [
"${precommitTests.join(' ')}",
"--capture=no", // Print stdout instantly
"--numprocesses=2", // Number of tests running in parallel
"--timeout=1800", // Timeout of whole command execution
]
def argMap = [
"test_opts" : testOpts,
"sdk_location": project.ext.sdkLocation,
"suite" : "preCommitIT-df${pythonSuffix}",
]
if (streaming){
argMap.put("streaming", "true")
}
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
}
preCommitIT(runScriptsDir, envdir, false, pythonVersionSuffix)
preCommitIT(runScriptsDir, envdir, true, pythonVersionSuffix)
task preCommitIT{
dependsOn preCommitIT_batch
dependsOn preCommitIT_streaming
}
task postCommitIT {
dependsOn 'initializeForDataflowJob'
doLast {
def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
def argMap = [
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
"collect": "it_postcommit",
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task postCommitArmIT {
def pyversion = "${project.ext.pythonVersion.replace('.', '')}"
dependsOn 'initializeForDataflowJob'
dependsOn ":sdks:python:container:py${pyversion}:docker"
doLast {
def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
def argMap = [
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
"collect": "it_postcommit",
"py_version": project.ext.pythonVersion,
"arch": "ARM"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task postCommitSickbay {
dependsOn 'initializeForDataflowJob'
doLast {
def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
def argMap = [
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
"collect": "it_postcommit_sickbay"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task spannerioIT {
dependsOn 'initializeForDataflowJob'
doLast {
def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
def argMap = [
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
"collect": "spannerio_it"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task examples {
dependsOn 'initializeForDataflowJob'
def testOpts = basicPytestOpts
// Execute tests with xdists
doFirst {
def argMap = [
"test_opts": testOpts + ["--numprocesses=8", "--dist=loadfile"],
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}-xdist",
"collect": "examples_postcommit and not no_xdist and not sickbay_dataflow"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
// Execute tests that fail with xdists
doLast {
def argMap = [
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}-no-xdist",
"collect": "examples_postcommit and no_xdist and not sickbay_dataflow"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task validatesRunnerBatchTests {
dependsOn 'initializeForDataflowJob'
doLast {
def argMap = [
"test_opts" : basicPytestOpts + ["--numprocesses=8"],
"sdk_location": project.ext.sdkLocation,
"suite" : "validatesRunnerBatchTests-df${pythonVersionSuffix}",
"collect": "it_validatesrunner and not no_sickbay_batch"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task validatesRunnerStreamingTests {
dependsOn 'initializeForDataflowJob'
// TODO(BEAM-3544,https://github.com/apache/beam/issues/19012): Disable tests with 'sickbay-streaming' tag.
// Execute tests with xdists
doFirst {
def argMap = [
"test_opts": basicPytestOpts + ["--numprocesses=8"],
"streaming": "true",
"sdk_location": project.ext.sdkLocation,
"suite": "validatesRunnerStreamingTests-df${pythonVersionSuffix}-xdist",
"collect": "it_validatesrunner and not no_sickbay_streaming and not no_xdist",
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
// Execute tests that fail with xdists
doLast {
def argMap = [
"test_opts": basicPytestOpts,
"streaming": "true",
"sdk_location": project.ext.sdkLocation,
"suite": "validatesRunnerStreamingTests-df${pythonVersionSuffix}-noxdist",
"collect": "it_validatesrunner and not no_sickbay_streaming and no_xdist",
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task runPerformanceTest {
dependsOn 'initializeForDataflowJob'
def test = project.findProperty('test')
def suite = "runPerformanceTest-df${pythonVersionSuffix}"
def xUnitFile ="pytest-${suite}.xml"
doLast {
def testOpts = project.findProperty('test-pipeline-options')
testOpts += " --sdk_location=${project.ext.sdkLocation}"
exec {
workingDir "${project.rootDir}/sdks/python"
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pytest -o junit_suite_name=${suite}" +
" ${test} --test-pipeline-options=\"${testOpts}\" --junitxml=${xUnitFile} --timeout=3600"
}
}
}
task mongodbioIT {
dependsOn 'initializeForDataflowJob'
doLast {
def opts = findProperty('opts')
opts = String.format("%s %s", opts, "--sdk_location=${project.ext.sdkLocation}")
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && python -m apache_beam.io.mongodbio_it_test ${opts}"
}
}
}
task installChicagoTaxiExampleRequirements {
dependsOn 'initializeForDataflowJob'
doLast {
exec {
workingDir "$rootProject.projectDir/sdks/python/apache_beam/testing/benchmarks/chicago_taxi/"
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install -r requirements.txt"
}
}
}
task chicagoTaxiExample {
dependsOn 'installChicagoTaxiExampleRequirements'
doLast {
def gcsRoot = findProperty('gcsRoot')
def pipelineOptions = findProperty('pipelineOptions') ?: ""
pipelineOptions += " --sdk_location=\"${project.ext.sdkLocation}\""
exec {
workingDir "$rootProject.projectDir/sdks/python/apache_beam/testing/benchmarks/chicago_taxi/"
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ./run_chicago.sh ${gcsRoot} TestDataflowRunner ${pipelineOptions}"
}
}
}
task validatesContainer() {
def pyversion = "${project.ext.pythonVersion.replace('.', '')}"
if (project.hasProperty("testRCDependencies")) {
// Generate a requirements file with pre-release versions for the docker task
// if testing with pre-release dependencies.
dependsOn ":sdks:python:container:py${pyversion}:generatePythonRequirements"
mustRunAfter ":sdks:python:container:py${pyversion}:generatePythonRequirements"
}
dependsOn 'initializeForDataflowJob'
dependsOn ":sdks:python:container:py${pyversion}:docker"
def runScriptsPath = "${rootDir}/sdks/python/container/run_validatescontainer.sh"
doLast {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${rootDir} && ${runScriptsPath} " +
"${project.ext.pythonVersion} " +
"${project.ext.sdkLocation}"
}
}
}
task validatesDistrolessContainer() {
def imageURL = "gcr.io/apache-beam-testing/beam-sdk/beam_python${project.ext.pythonVersion}_sdk_distroless:${project.sdk_version}"
doLast {
exec {
def testTarget = "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it"
def argMap = [
"output" : "gs://temp-storage-for-end-to-end-tests/py-it-cloud/output",
"project" : "apache-beam-testing",
"region" : "us-central1",
"runner" : "TestDataflowRunner",
"sdk_container_image": "${imageURL}",
"sdk_location" : "container",
"staging_location" : "gs://temp-storage-for-end-to-end-tests/staging-it",
"temp_location" : "gs://temp-storage-for-end-to-end-tests/temp-it",
]
def cmdArgs = mapToArgString(argMap)
workingDir = "${rootDir}/sdks/python"
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pytest ${testTarget} --test-pipeline-options=\"${cmdArgs}\""
}
}
}
task validatesContainerARM() {
def pyversion = "${project.ext.pythonVersion.replace('.', '')}"
dependsOn 'initializeForDataflowJob'
dependsOn ":sdks:python:container:py${pyversion}:docker"
def runScriptsPath = "${rootDir}/sdks/python/container/run_validatescontainer.sh"
doLast {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${rootDir} && ${runScriptsPath} " +
"${project.ext.pythonVersion} " +
"${project.ext.sdkLocation} " +
"ARM"
}
}
}
def tensorRTTests = tasks.create("tensorRTtests") {
dependsOn 'installGcpTest'
dependsOn ':sdks:python:sdist'
doLast {
def testOpts = basicPytestOpts
def argMap = [
"runner": "DataflowRunner",
"machine_type":"n1-standard-4",
// TODO(https://github.com/apache/beam/issues/22651): Build docker image for tensor RT tests during Run time.
// This would also enable to use wheel "--sdk_location" as other tasks, and eliminate distTarBall dependency
// declaration for this project.
"sdk_container_image": "us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest",
"sdk_location": files(configurations.distTarBall.files).singleFile,
"project": "apache-beam-testing",
"region": "us-central1",
"input": "gs://apache-beam-ml/testing/inputs/tensorrt_image_file_names.txt",
"output": "gs://apache-beam-ml/outputs/tensorrt_predictions.txt",
"engine_path": "gs://apache-beam-ml/models/ssd_mobilenet_v2_320x320_coco17_tpu-8.trt",
"disk_size_gb": 75
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install pillow && python -m apache_beam.examples.inference.tensorrt_object_detection $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver' --experiment=no_use_multiple_sdk_containers"
}
}
}
def vllmTests = tasks.create("vllmTests") {
dependsOn 'installGcpTest'
dependsOn ':sdks:python:sdist'
doLast {
def testOpts = basicPytestOpts
def argMap = [
"runner": "DataflowRunner",
"machine_type":"n1-standard-4",
// TODO(https://github.com/apache/beam/issues/22651): Build docker image for VLLM tests during Run time.
// This would also enable to use wheel "--sdk_location" as other tasks, and eliminate distTarBall dependency
// declaration for this project.
// Right now, this is built from https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old
"sdk_container_image": "us.gcr.io/apache-beam-testing/python-postcommit-it/vllm:latest",
"sdk_location": files(configurations.distTarBall.files).singleFile,
"project": "apache-beam-testing",
"region": "us-central1",
"model": "facebook/opt-125m",
"output": "gs://apache-beam-ml/outputs/vllm_predictions.txt",
"disk_size_gb": 75
]
def cmdArgs = mapToArgString(argMap)
// Exec one version with and one version without the chat option
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
}
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
}
}
}
// Vertex AI RunInference IT tests
task vertexAIInferenceTest {
dependsOn 'initializeForDataflowJob'
dependsOn ':sdks:python:sdist'
def requirementsFile = "${rootDir}/sdks/python/apache_beam/ml/inference/vertex_ai_tests_requirements.txt"
doFirst {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install -r $requirementsFile"
}
}
doLast {
def testOpts = basicTestOpts
def argMap = [
"test_opts": testOpts,
"suite": "VertexAITests-df-py${pythonVersionSuffix}",
"collect": "vertex_ai_postcommit" ,
"runner": "TestDataflowRunner",
"requirements_file": "$requirementsFile"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
// Vertex AI RunInference IT tests
task geminiInferenceTest {
dependsOn 'initializeForDataflowJob'
dependsOn ':sdks:python:sdist'
def requirementsFile = "${rootDir}/sdks/python/apache_beam/ml/inference/gemini_tests_requirements.txt"
doFirst {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install -r $requirementsFile"
}
}
doLast {
def testOpts = basicTestOpts
def argMap = [
"test_opts": testOpts,
"suite": "GeminiTests-df-py${pythonVersionSuffix}",
"collect": "gemini_postcommit" ,
"runner": "TestDataflowRunner",
"requirements_file": "$requirementsFile"
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
task installTFTRequirements {
dependsOn 'initializeForDataflowJob'
doLast {
exec {
workingDir "$rootProject.projectDir/sdks/python/apache_beam/testing/benchmarks/cloudml/"
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install -r requirements.txt"
}
}
}
// Tensorflow transform integration and benchmarking tests on Apache Beam.
task tftTests {
dependsOn "installTFTRequirements"
doLast {
def opts = project.findProperty('opts')
opts += " --sdk_location=${project.ext.sdkLocation}"
def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
def argMap = [
"test_opts": testOpts,
"suite": "TFTransformTests-df${pythonVersionSuffix}",
"collect": "uses_tft",
"requirements_file": "apache_beam/testing/benchmarks/cloudml/requirements.txt",
"pipeline_opts": opts,
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs "
}
}
}
// Tests that depend on Mock API:https://github.com/apache/beam/tree/master/.test-infra/mock-apis. .
task mockAPITests {
dependsOn 'initializeForDataflowJob'
dependsOn ':sdks:python:sdist'
doLast {
def testOpts = basicTestOpts
def argMap = [
"test_opts": testOpts,
"collect": "uses_mock_api",
"runner": "TestDataflowRunner",
"project": "apache-beam-testing",
"region": "us-west1",
]
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
}
}
}
// add all RunInference E2E tests that run on DataflowRunner
// As of now, this test suite is enable in py310 suite as the base NVIDIA image used for Tensor RT
// contains Python 3.10.
// TODO: https://github.com/apache/beam/issues/22651
project.tasks.register("inferencePostCommitIT") {
dependsOn = [
// TODO(https://github.com/apache/beam/issues/33078): restore the tensorRT tests once the staged
// model is fixed.
// 'tensorRTtests',
'vertexAIInferenceTest',
'geminiInferenceTest',
'mockAPITests',
]
}
project.tasks.register("inferencePostCommitITPy312") {
dependsOn = [
'vllmTests',
]
}
// Create cross-language tasks for running tests against Java expansion service(s)
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpRegion = project.findProperty('gcpRegion') ?: 'us-central1'
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
createCrossLanguageUsingJavaExpansionTask(
name: taskMetadata.name,
expansionProjectPaths: taskMetadata.expansionProjectPaths,
collectMarker: taskMetadata.collectMarker,
pythonPipelineOptions: [
"--runner=TestDataflowRunner",
"--project=${gcpProject}",
"--region=${gcpRegion}",
"--sdk_container_image=gcr.io/apache-beam-testing/beam-sdk/beam_python${project.ext.pythonVersion}_sdk:latest",
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"
],
pytestOptions: basicPytestOpts,
additionalDeps: taskMetadata.additionalDeps,
additionalEnvs: taskMetadata.additionalEnvs
)
}