| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import org.apache.tools.ant.taskdefs.condition.Os |
| |
| apply plugin: org.apache.beam.gradle.BeamModulePlugin |
| applyPythonNature() |
| |
| def pythonRootDir = "${rootDir}/sdks/python" |
| |
| /*************************************************************************************************/ |
| |
| addPortableWordCountTasks() |
| |
| task preCommitPy2() { |
| dependsOn ':runners:flink:1.9:job-server-container:docker' |
| dependsOn ':sdks:python:container:py2:docker' |
| dependsOn portableWordCountBatch |
| dependsOn portableWordCountStreaming |
| } |
| |
| task postCommitPy2() { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.9:job-server:shadowJar' |
| dependsOn portableWordCountFlinkRunnerBatch |
| dependsOn portableWordCountFlinkRunnerStreaming |
| dependsOn ':runners:spark:job-server:shadowJar' |
| dependsOn portableWordCountSparkRunnerBatch |
| } |
| |
| // TODO: Move the rest of this file into ../common.gradle. |
| |
| // Before running this, you need to: |
| // |
| // 1. Build the SDK container: |
| // |
| // ./gradlew -p sdks/python/container buildAll |
| // |
| // 2. Either a) or b) |
| // a) If you want the Job Server to run in a Docker container: |
| // |
| // ./gradlew :runners:flink:1.9:job-server-container:docker |
| // |
| // b) Otherwise, start a local JobService, for example, the Portable Flink runner |
| // (in a separate shell since it continues to run): |
| // |
| // ./gradlew :runners:flink:1.9:job-server:runShadow |
| // |
| // Then you can run this example: |
| // |
| // Docker (2a): |
| // |
| // ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount |
| // |
| // Local JobService (2b): |
| // |
| // ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount -PjobEndpoint=localhost:8099 |
| // |
| task portableWordCount { |
| dependsOn project.hasProperty("streaming") ? portableWordCountStreaming : portableWordCountBatch |
| } |
| |
| /*************************************************************************************************/ |
| |
| task crossLanguagePythonJavaDirect { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':sdks:java:container:docker' |
| dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar' |
| |
| doLast { |
| def options = [ |
| "--expansion_service_target=sdks:java:testing:expansion-service:buildTestExpansionServiceJar", |
| "--expansion_service_target_appendix=testExpansionService", |
| ] |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}" |
| } |
| } |
| } |
| |
| task crossLanguagePythonJavaFlink { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.9:job-server-container:docker' |
| dependsOn ':sdks:python:container:py2:docker' |
| dependsOn ':sdks:java:container:docker' |
| dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar' |
| |
| doLast { |
| def testServiceExpansionJar = project(":sdks:java:testing:expansion-service:").buildTestExpansionServiceJar.archivePath |
| def options = [ |
| "--runner=PortableRunner", |
| "--experiments=worker_threads=100", |
| "--parallelism=2", |
| "--shutdown_sources_on_final_watermark", |
| "--environment_cache_millis=10000", |
| "--expansion_service_port=8096", |
| "--expansion_service_jar=${testServiceExpansionJar}", |
| ] |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}" |
| } |
| } |
| } |
| |
| task crossLanguagePortableWordCount { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.9:job-server-container:docker' |
| dependsOn ':sdks:python:container:py2:docker' |
| dependsOn ':sdks:java:container:docker' |
| dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar' |
| |
| doLast { |
| def testServiceExpansionJar = project(":sdks:java:testing:expansion-service:").buildTestExpansionServiceJar.archivePath |
| def options = [ |
| "--input=/etc/profile", |
| "--output=/tmp/py-wordcount-portable", |
| "--runner=PortableRunner", |
| "--experiments=worker_threads=100", |
| "--parallelism=2", |
| "--shutdown_sources_on_final_watermark", |
| "--environment_cache_millis=10000", |
| "--expansion_service_jar=${testServiceExpansionJar}", |
| // Writes to local filesystem might fail for multiple SDK workers. |
| "--sdk_worker_parallelism=1" |
| ] |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}" |
| // TODO: Check that the output file is generated and runs. |
| } |
| } |
| } |
| |
| task crossLanguageTests { |
| dependsOn "crossLanguagePythonJavaFlink" |
| dependsOn "crossLanguagePortableWordCount" |
| } |
| |
| /*************************************************************************************************/ |
| |
| task createProcessWorker { |
| dependsOn ':sdks:python:container:build' |
| dependsOn 'setupVirtualenv' |
| def sdkWorkerFile = file("${buildDir}/sdk_worker.sh") |
| def osType = 'linux' |
| if (Os.isFamily(Os.FAMILY_MAC)) |
| osType = 'darwin' |
| def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot" |
| def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \"" |
| outputs.file sdkWorkerFile |
| doLast { |
| sdkWorkerFile.write sdkWorkerFileCode |
| exec { |
| commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]") |
| } |
| exec { |
| commandLine('chmod', '+x', sdkWorkerFile) |
| } |
| } |
| } |
| |
| task sparkValidatesRunner() { |
| dependsOn 'createProcessWorker' |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:spark:job-server:shadowJar' |
| doLast { |
| def environment_config = "'{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" |
| def argMap = [ |
| "environment_type" : "PROCESS", |
| "spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath, |
| "environment_config": environment_config, |
| ] |
| def argString = mapToArgString(argMap) |
| |
| // Optionally specify test function names separated by space e.g.: |
| // ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read" |
| // Otherwise run all test functions under SparkRunnerTest |
| def tests = project.hasProperty('tests') ? |
| project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : '' |
| |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString" |
| } |
| } |
| } |
| |
| apply from: "../common.gradle" |