blob: 48312a637328eeb72c1cdedb331fdc1867b00b32 [file] [log] [blame]
import org.apache.tools.ant.taskdefs.condition.Os
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def pythonRootDir = "${rootDir}/sdks/python"
def pythonVersionSuffix = project.ext.pythonVersion == '2.7' ? '2' : project.ext.pythonVersion.replace('.', '')
def pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker"
class CompatibilityMatrixConfig {
// Execute batch or streaming pipelines.
boolean streaming = false
// Execute on Docker or Process based environment.
SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER
enum SDK_WORKER_TYPE {
DOCKER, PROCESS, LOOPBACK
}
// Whether to pre-optimize the pipeline with the Python optimizer.
boolean preOptimize = false
}
def flinkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType = config.workerType.name()
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
def extra_experiments = []
if (config.preOptimize)
extra_experiments.add('pre_optimize=all')
tasks.create(name: name) {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.10:job-server:shadowJar'
dependsOn ':sdks:java:container:docker' // required for test_external_transforms
if (workerType.toLowerCase() == 'docker')
dependsOn pythonContainerTask
else if (workerType.toLowerCase() == 'process')
dependsOn 'createProcessWorker'
doLast {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.10:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}"
}
}
}
}
task flinkCompatibilityMatrixDocker() {
dependsOn flinkCompatibilityMatrix(streaming: false)
dependsOn flinkCompatibilityMatrix(streaming: true)
}
task flinkCompatibilityMatrixProcess() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
}
task flinkCompatibilityMatrixLoopback() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true)
}
task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLoopback'
}
// TODO(BEAM-8598): Enable on pre-commit.
task flinkTriggerTranscript() {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.10:job-server:shadowJar'
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& python setup.py nosetests \\
--tests apache_beam.transforms.trigger_test:WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:1.10:job-server:").shadowJar.archivePath}'
"""
}
}
}
task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
def sparkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType = config.workerType.name()
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
def name = "sparkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
tasks.create(name: name) {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':runners:spark:job-server:shadowJar'
doLast {
def argMap = [
"environment_type" : workerType,
"spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
"environment_cache_millis": 10000,
]
def argString = mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?
project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString ${environment_config}"
}
}
}
}
task sparkCompatibilityMatrixDocker() {
dependsOn sparkCompatibilityMatrix(streaming: false)
}
task sparkCompatibilityMatrixProcess() {
dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
}
task sparkCompatibilityMatrixLoopback() {
dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
}
task sparkValidatesRunner() {
dependsOn 'sparkCompatibilityMatrixLoopback'
}
project.task("preCommitPy${pythonVersionSuffix}") {
dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
':runners:flink:1.10:job-server:shadowJar',
'portableWordCountFlinkRunnerBatch',
'portableWordCountFlinkRunnerStreaming']
}
project.task("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:job-server:shadowJar',
'portableWordCountSparkRunnerBatch']
}
project.task("postCommitPy${pythonVersionSuffix}IT") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
':runners:flink:1.10:job-server:shadowJar',
':sdks:java:container:docker',
':sdks:java:io:expansion-service:shadowJar',
':sdks:java:testing:kafka-service:buildTestKafkaServiceJar'
]
doLast {
def tests = [
"apache_beam.io.gcp.bigquery_read_it_test",
"apache_beam.io.external.xlang_jdbcio_it_test",
"apache_beam.io.external.xlang_kafkaio_it_test",
]
def testOpts = ["--tests=${tests.join(',')}"]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
])
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
exec {
environment "LOCAL_KAFKA_JAR", kafkaJar
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}