| import org.apache.tools.ant.taskdefs.condition.Os |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| def pythonRootDir = "${rootDir}/sdks/python" |
| def pythonVersionSuffix = project.ext.pythonVersion == '2.7' ? '2' : project.ext.pythonVersion.replace('.', '') |
| def pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker" |
| |
| class CompatibilityMatrixConfig { |
| // Execute batch or streaming pipelines. |
| boolean streaming = false |
| // Execute on Docker or Process based environment. |
| SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER |
| |
| enum SDK_WORKER_TYPE { |
| DOCKER, PROCESS, LOOPBACK |
| } |
| |
| // Whether to pre-optimize the pipeline with the Python optimizer. |
| boolean preOptimize = false |
| } |
| |
| def flinkCompatibilityMatrix = { |
| def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig() |
| def workerType = config.workerType.name() |
| def streaming = config.streaming |
| def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : "" |
| def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}" |
| def extra_experiments = [] |
| if (config.preOptimize) |
| extra_experiments.add('pre_optimize=all') |
| tasks.create(name: name) { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.10:job-server:shadowJar' |
| dependsOn ':sdks:java:container:docker' // required for test_external_transforms |
| if (workerType.toLowerCase() == 'docker') |
| dependsOn pythonContainerTask |
| else if (workerType.toLowerCase() == 'process') |
| dependsOn 'createProcessWorker' |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.10:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}" |
| } |
| } |
| } |
| } |
| |
| task flinkCompatibilityMatrixDocker() { |
| dependsOn flinkCompatibilityMatrix(streaming: false) |
| dependsOn flinkCompatibilityMatrix(streaming: true) |
| } |
| |
| task flinkCompatibilityMatrixProcess() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| } |
| |
| task flinkCompatibilityMatrixLoopback() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true) |
| } |
| |
| task flinkValidatesRunner() { |
| dependsOn 'flinkCompatibilityMatrixLoopback' |
| } |
| |
| // TODO(BEAM-8598): Enable on pre-commit. |
| task flinkTriggerTranscript() { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.10:job-server:shadowJar' |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', """ |
| . ${envdir}/bin/activate \\ |
| && cd ${pythonRootDir} \\ |
| && pip install -e .[test] \\ |
| && python setup.py nosetests \\ |
| --tests apache_beam.transforms.trigger_test:WeakTestStreamTranscriptTest \\ |
| --test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:1.10:job-server:").shadowJar.archivePath}' |
| """ |
| } |
| } |
| } |
| |
| task createProcessWorker { |
| dependsOn ':sdks:python:container:build' |
| dependsOn 'setupVirtualenv' |
| def sdkWorkerFile = file("${buildDir}/sdk_worker.sh") |
| def osType = 'linux' |
| if (Os.isFamily(Os.FAMILY_MAC)) |
| osType = 'darwin' |
| def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot" |
| def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \"" |
| outputs.file sdkWorkerFile |
| doLast { |
| sdkWorkerFile.write sdkWorkerFileCode |
| exec { |
| commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]") |
| } |
| exec { |
| commandLine('chmod', '+x', sdkWorkerFile) |
| } |
| } |
| } |
| |
| def sparkCompatibilityMatrix = { |
| def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig() |
| def workerType = config.workerType.name() |
| def streaming = config.streaming |
| def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : "" |
| def name = "sparkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}" |
| tasks.create(name: name) { |
| dependsOn 'createProcessWorker' |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:spark:job-server:shadowJar' |
| doLast { |
| def argMap = [ |
| "environment_type" : workerType, |
| "spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath, |
| "environment_cache_millis": 10000, |
| ] |
| def argString = mapToArgString(argMap) |
| |
| // Optionally specify test function names separated by space e.g.: |
| // ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read" |
| // Otherwise run all test functions under SparkRunnerTest |
| def tests = project.hasProperty('tests') ? |
| project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : '' |
| |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString ${environment_config}" |
| } |
| } |
| } |
| } |
| |
| task sparkCompatibilityMatrixDocker() { |
| dependsOn sparkCompatibilityMatrix(streaming: false) |
| } |
| |
| task sparkCompatibilityMatrixProcess() { |
| dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| } |
| |
| task sparkCompatibilityMatrixLoopback() { |
| dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| } |
| |
| task sparkValidatesRunner() { |
| dependsOn 'sparkCompatibilityMatrixLoopback' |
| } |
| |
| project.task("preCommitPy${pythonVersionSuffix}") { |
| dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", |
| ':runners:flink:1.10:job-server:shadowJar', |
| 'portableWordCountFlinkRunnerBatch', |
| 'portableWordCountFlinkRunnerStreaming'] |
| } |
| |
| project.task("postCommitPy${pythonVersionSuffix}") { |
| dependsOn = ['setupVirtualenv', |
| "postCommitPy${pythonVersionSuffix}IT", |
| ':runners:spark:job-server:shadowJar', |
| 'portableWordCountSparkRunnerBatch'] |
| } |
| |
| project.task("postCommitPy${pythonVersionSuffix}IT") { |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ':runners:flink:1.10:job-server:shadowJar', |
| ':sdks:java:container:docker', |
| ':sdks:java:io:expansion-service:shadowJar', |
| ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar' |
| ] |
| |
| doLast { |
| def tests = [ |
| "apache_beam.io.gcp.bigquery_read_it_test", |
| "apache_beam.io.external.xlang_jdbcio_it_test", |
| "apache_beam.io.external.xlang_kafkaio_it_test", |
| ] |
| def testOpts = ["--tests=${tests.join(',')}"] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitIT-flink-py${pythonVersionSuffix}", |
| "pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| ]) |
| def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath |
| exec { |
| environment "LOCAL_KAFKA_JAR", kafkaJar |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |