| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| def pythonRootDir = "${rootDir}/sdks/python" |
| def pythonContainerSuffix = project.ext.pythonVersion == '2.7' ? '2' : project.ext.pythonVersion.replace('.', '') |
| def pythonContainerTask = ":sdks:python:container:py${pythonContainerSuffix}:docker" |
| |
| class CompatibilityMatrixConfig { |
| // Execute batch or streaming pipelines. |
| boolean streaming = false |
| // Execute on Docker or Process based environment. |
| SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER |
| |
| enum SDK_WORKER_TYPE { |
| DOCKER, PROCESS, LOOPBACK |
| } |
| |
| // Whether to pre-optimize the pipeline with the Python optimizer. |
| boolean preOptimize = false |
| } |
| |
| def flinkCompatibilityMatrix = { |
| def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig() |
| def workerType = config.workerType.name() |
| def streaming = config.streaming |
| def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : "" |
| def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}" |
| def extra_experiments = [] |
| if (config.preOptimize) |
| extra_experiments.add('pre_optimize=all') |
| tasks.create(name: name) { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.8:job-server:shadowJar' |
| if (workerType.toLowerCase() == 'docker') |
| dependsOn pythonContainerTask |
| else if (workerType.toLowerCase() == 'process') |
| dependsOn 'createProcessWorker' |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.8:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}" |
| } |
| } |
| } |
| } |
| |
| task flinkCompatibilityMatrixDocker() { |
| dependsOn flinkCompatibilityMatrix(streaming: false) |
| dependsOn flinkCompatibilityMatrix(streaming: true) |
| } |
| |
| task flinkCompatibilityMatrixProcess() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| } |
| |
| task flinkCompatibilityMatrixLoopback() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true) |
| } |
| |
| task flinkValidatesRunner() { |
| dependsOn 'flinkCompatibilityMatrixLoopback' |
| } |