blob: 3cb436235e246bf0452edcf4363ae5666f015dba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def pythonRootDir = "${rootDir}/sdks/python"
def pythonContainerSuffix = project.ext.pythonVersion == '2.7' ? '2' : project.ext.pythonVersion.replace('.', '')
def pythonContainerTask = ":sdks:python:container:py${pythonContainerSuffix}:docker"
class CompatibilityMatrixConfig {
// Execute batch or streaming pipelines.
boolean streaming = false
// Execute on Docker or Process based environment.
SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER
enum SDK_WORKER_TYPE {
DOCKER, PROCESS, LOOPBACK
}
// Whether to pre-optimize the pipeline with the Python optimizer.
boolean preOptimize = false
}
def flinkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType = config.workerType.name()
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
def extra_experiments = []
if (config.preOptimize)
extra_experiments.add('pre_optimize=all')
tasks.create(name: name) {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.9:job-server:shadowJar'
dependsOn ':sdks:java:container:docker' // required for test_external_transforms
if (workerType.toLowerCase() == 'docker')
dependsOn pythonContainerTask
else if (workerType.toLowerCase() == 'process')
dependsOn 'createProcessWorker'
doLast {
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.9:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}"
}
}
}
}
task flinkCompatibilityMatrixDocker() {
dependsOn flinkCompatibilityMatrix(streaming: false)
dependsOn flinkCompatibilityMatrix(streaming: true)
}
task flinkCompatibilityMatrixProcess() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
}
task flinkCompatibilityMatrixLoopback() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true)
}
task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLoopback'
}
// TODO(BEAM-8598): Enable on pre-commit.
task flinkTriggerTranscript() {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.9:job-server:shadowJar'
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& python setup.py nosetests \\
--tests apache_beam.transforms.trigger_test:WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:1.9:job-server:").shadowJar.archivePath}'
"""
}
}
}