| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import org.apache.tools.ant.taskdefs.condition.Os |
| |
| plugins { id 'org.apache.beam.module' } |
| applyPythonNature() |
| enablePythonPerformanceTest() |
| |
| |
| /*************************************************************************************************/ |
| // Basic build and Python environment setup/cleanup |
| |
| task buildPython(dependsOn: 'setupVirtualenv') { |
| doLast { |
| println 'Building Python Dependencies' |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && python setup.py build --build-base ${project.buildDir}" |
| } |
| } |
| } |
| build.dependsOn buildPython |
| |
| |
| /*************************************************************************************************/ |
| // Unit tests for Python 2 |
| // See Python 3 tests in test-suites/tox |
| |
| task lint {} |
| check.dependsOn lint |
| |
| toxTask "lintPy27", "py27-lint" |
| lint.dependsOn lintPy27 |
| |
| toxTask "lintPy27_3", "py27-lint3" |
| lint.dependsOn lintPy27_3 |
| |
| toxTask "testPy2Gcp", "py27-gcp" |
| test.dependsOn testPy2Gcp |
| |
| toxTask "testPython2", "py27" |
| test.dependsOn testPython2 |
| |
| toxTask "testPy2Cython", "py27-cython" |
| test.dependsOn testPy2Cython |
| // Ensure that testPy2Cython runs exclusively to other tests. This line is not |
| // actually required, since gradle doesn't do parallel execution within a |
| // project. |
| testPy2Cython.mustRunAfter testPython2, testPy2Gcp |
| |
| toxTask "docs", "docs" |
| assemble.dependsOn docs |
| |
| toxTask "cover", "cover" |
| |
| task preCommitPy2() { |
| dependsOn "docs" |
| dependsOn "testPy2Cython" |
| dependsOn "testPython2" |
| dependsOn "testPy2Gcp" |
| dependsOn "lint" |
| } |
| |
| task portablePreCommit() { |
| dependsOn ':runners:flink:1.5:job-server-container:docker' |
| dependsOn ':sdks:python:container:docker' |
| dependsOn portableWordCountTask('portableWordCountBatch', false) |
| dependsOn portableWordCountTask('portableWordCountStreaming', true) |
| } |
| |
| |
| /*************************************************************************************************/ |
| // E2E integration testing and validates runner testing |
| |
| // Basic test options for ITs running on Jenkins. |
| def basicTestOpts = [ |
| "--nocapture", // print stdout instantly |
| "--processes=8", // run tests in parallel |
| "--process-timeout=4500", // timeout of whole command execution |
| ] |
| |
| task directRunnerIT(dependsOn: 'installGcpTest') { |
| // Run IT tests with TestDirectRunner in batch. |
| doLast { |
| def tests = [ |
| "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it", |
| "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest", |
| "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT", |
| "apache_beam.io.gcp.bigquery_io_read_it_test", |
| "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test", |
| ] |
| def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"] |
| def argMap = ["runner": "TestDirectRunner", |
| "test_opts": batchTestOpts] |
| def batchCmdArgs = project.mapToArgString(argMap) |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $batchCmdArgs" |
| } |
| } |
| |
| // Run IT tests with TestDirectRunner in streaming. |
| doLast { |
| def tests = [ |
| "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it", |
| "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest", |
| ] |
| def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"] |
| def argMap = ["runner": "TestDirectRunner", |
| "streaming": "true", |
| "test_opts": streamingTestOpts] |
| def streamingCmdArgs = project.mapToArgString(argMap) |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $streamingCmdArgs" |
| } |
| } |
| } |
| |
| // Before running this, you need to: |
| // |
| // 1. Build the SDK container: |
| // |
| // ./gradlew -p sdks/python/container docker |
| // |
| // 2. Either a) or b) |
| // a) If you want the Job Server to run in a Docker container: |
| // |
| // ./gradlew :runners:flink:1.5:job-server-container:docker |
| // |
| // b) Otherwise, start a local JobService, for example, the Portable Flink runner |
| // (in a separate shell since it continues to run): |
| // |
| // ./gradlew :runners:flink:1.5:job-server:runShadow |
| // |
| // Then you can run this example: |
| // |
| // Docker (2a): |
| // |
| // ./gradlew :sdks:python:portableWordCount |
| // |
| // Local JobService (2b): |
| // |
| // ./gradlew :sdks:python:portableWordCount -PjobEndpoint=localhost:8099 |
| // |
| task portableWordCount { |
| dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming")) |
| } |
| |
| def portableWordCountTask(name, streaming) { |
| tasks.create(name) { |
| dependsOn = ['installGcpTest'] |
| mustRunAfter = [':runners:flink:1.5:job-server-container:docker', ':sdks:python:container:docker'] |
| doLast { |
| // TODO: Figure out GCS credentials and use real GCS input and output. |
| def options = [ |
| "--input=/etc/profile", |
| "--output=/tmp/py-wordcount-direct", |
| "--runner=PortableRunner", |
| "--experiments=worker_threads=100", |
| "--parallelism=2", |
| "--shutdown_sources_on_final_watermark", |
| ] |
| if (streaming) |
| options += ["--streaming"] |
| else |
| // workaround for local file output in docker container |
| options += ["--environment_cache_millis=10000"] |
| if (project.hasProperty("jobEndpoint")) |
| options += ["--job_endpoint=${project.property('jobEndpoint')}"] |
| if (project.hasProperty("environmentType")) { |
| options += ["--environment_type=${project.property('environmentType')}"] |
| } |
| if (project.hasProperty("environmentConfig")) { |
| options += ["--environment_config=${project.property('environmentConfig')}"] |
| } |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount ${options.join(' ')}" |
| // TODO: Check that the output file is generated and runs. |
| } |
| } |
| } |
| } |
| |
| // Run PostCommit integration tests on default runner (TestDataflowRunner) |
| task postCommitIT(dependsOn: ['installGcpTest', 'sdist']) { |
| dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" |
| |
| def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath |
| |
| doLast { |
| def testOpts = basicTestOpts + ["--attr=IT"] |
| def cmdArgs = project.mapToArgString(["test_opts": testOpts, |
| "worker_jar": dataflowWorkerJar]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| task validatesRunnerBatchTests(dependsOn: ['installGcpTest', 'sdist']) { |
| dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" |
| |
| def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath |
| |
| doLast { |
| def testOpts = basicTestOpts + ["--attr=ValidatesRunner"] |
| def cmdArgs = project.mapToArgString(["test_opts": testOpts, |
| "worker_jar": dataflowWorkerJar]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| task validatesRunnerStreamingTests(dependsOn: ['installGcpTest', 'sdist']) { |
| dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" |
| |
| def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath |
| |
| doLast { |
| // TODO(BEAM-3544,BEAM-5025): Disable tests with 'sickbay-streaming' tag. |
| def testOpts = basicTestOpts + ["--attr=ValidatesRunner,!sickbay-streaming"] |
| def argMap = ["test_opts": testOpts, |
| "streaming": "true", |
| "worker_jar": dataflowWorkerJar] |
| def cmdArgs = project.mapToArgString(argMap) |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| task hdfsIntegrationTest(dependsOn: 'installGcpTest') { |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && ./apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh" |
| } |
| } |
| } |
| |
| task sparkValidatesRunner() { |
| dependsOn 'createProcessWorker' |
| dependsOn 'setupVirtualenv' |
| dependsOn ':beam-runners-spark-job-server:shadowJar' |
| doLast { |
| def environment_config = "'{\"command\": \"${project(":beam-sdks-python:").buildDir.absolutePath}/sdk_worker.sh\"}'" |
| def argMap = [ |
| "environment_type" : "PROCESS", |
| "spark_job_server_jar": project(":beam-runners-spark-job-server:").shadowJar.archivePath, |
| "environment_config": environment_config, |
| ] |
| def argString = project.mapToArgString(argMap) |
| |
| // Optionally specify test function names separated by space e.g.: |
| // ./gradlew :beam-sdks-python:sparkValidatesRunner -Ptests="test_external_transforms test_read" |
| // Otherwise run all test functions under SparkRunnerTest |
| def tests = project.hasProperty('tests') ? |
| project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : '' |
| |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString" |
| } |
| } |
| } |
| |
| class CompatibilityMatrixConfig { |
| // Execute batch or streaming pipelines. |
| boolean streaming = false |
| // Execute on Docker or Process based environment. |
| SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER |
| |
| enum SDK_WORKER_TYPE { |
| DOCKER, PROCESS, LOOPBACK |
| } |
| |
| // Whether to pre-optimize the pipeline with the Python optimizer. |
| boolean preOptimize = false |
| } |
| |
| def flinkCompatibilityMatrix = { |
| def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig() |
| def workerType = config.workerType.name() |
| def streaming = config.streaming |
| def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${project(":sdks:python").buildDir.absolutePath}/sdk_worker.sh\"}'" : "" |
| def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}" |
| def extra_experiments = [] |
| if (config.preOptimize) |
| extra_experiments.add('pre_optimize=all') |
| tasks.create(name: name) { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.5:job-server:shadowJar' |
| if (workerType.toLowerCase() == 'docker') |
| dependsOn ':sdks:python:container:docker' |
| else if (workerType.toLowerCase() == 'process') |
| dependsOn 'createProcessWorker' |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.5:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}" |
| } |
| } |
| } |
| } |
| |
| task flinkCompatibilityMatrixDocker() { |
| dependsOn flinkCompatibilityMatrix(streaming: false) |
| dependsOn flinkCompatibilityMatrix(streaming: true) |
| } |
| |
| task flinkCompatibilityMatrixProcess() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS) |
| } |
| |
| task flinkCompatibilityMatrixLoopback() { |
| dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) |
| dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true) |
| } |
| |
| task flinkValidatesRunner() { |
| dependsOn 'flinkCompatibilityMatrixLoopback' |
| } |
| |
| // Run Python ValidatesRunner tests using the Java ReferenceRunner as a job server and Docker as |
| // the SDK environment. |
| task javaReferenceRunnerValidatesRunner() { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:reference:job-server:shadowJar' |
| dependsOn ':sdks:python:container:docker' |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.java_reference_runner_test --job_server_jar=${project(":runners:reference:job-server:").shadowJar.archivePath} --environment_type=DOCKER" |
| } |
| } |
| } |
| |
| task postCommit() { |
| dependsOn "crossLanguageTests" |
| dependsOn "directRunnerIT" |
| dependsOn "hdfsIntegrationTest" |
| dependsOn "postCommitIT" |
| } |
| |
| |
| /*************************************************************************************************/ |
| // Other build and analysis tasks |
| |
| // Snapshot of dependency requirements defined in setup.py. |
| // Results will be stored in files under Gradle build directory. |
| task depSnapshot(dependsOn: 'installGcpTest') { |
| doLast { |
| println 'Snapshoting full dependencies requirements with versions info to requirements.txt.' |
| exec { |
| // Remove useless item "pkg-resources" from file which is introduced by a bug in Ubuntu. |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip freeze --local --all | grep -v \"pkg-resources\" > ${project.buildDir}/requirements.txt" |
| } |
| } |
| } |
| |
| task dependencyUpdates(dependsOn: ':dependencyUpdates') { |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', "./scripts/run_dependency_check.sh" |
| } |
| } |
| } |
| |
| task buildSnapshot() { |
| dependsOn 'sdist' |
| dependsOn 'depSnapshot' |
| } |
| |
| project.task('createProcessWorker') { |
| dependsOn ':sdks:python:container:build' |
| dependsOn 'setupVirtualenv' |
| def sdkWorkerFile = file("${project.buildDir}/sdk_worker.sh") |
| def osType = 'linux' |
| if (Os.isFamily(Os.FAMILY_MAC)) |
| osType = 'darwin' |
| def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot" |
| def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${project.ext.envdir}/bin/activate && ${workerScript} \$* \"" |
| outputs.file sdkWorkerFile |
| doLast { |
| sdkWorkerFile.write sdkWorkerFileCode |
| exec { |
| commandLine('sh', '-c', ". ${project.ext.envdir}/bin/activate && cd ${project.projectDir} && python setup.py install ") |
| } |
| exec { |
| commandLine('chmod', '+x', sdkWorkerFile) |
| } |
| } |
| } |
| |
| project.task('crossLanguagePythonJavaFlink') { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.5:job-server-container:docker' |
| dependsOn ':sdks:python:container:docker' |
| dependsOn ':sdks:java:container:docker' |
| dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar' |
| |
| doLast { |
| def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath |
| def options = [ |
| "--runner=PortableRunner", |
| "--experiments=worker_threads=100", |
| "--parallelism=2", |
| "--shutdown_sources_on_final_watermark", |
| "--environment_cache_millis=10000", |
| "--expansion_service_port=8096", |
| "--expansion_service_jar=${testServiceExpansionJar}", |
| ] |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}" |
| } |
| } |
| } |
| |
| project.task('crossLanguagePortableWordCount') { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:flink:1.5:job-server-container:docker' |
| dependsOn ':sdks:python:container:docker' |
| dependsOn ':sdks:java:container:docker' |
| dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar' |
| |
| doLast { |
| def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath |
| def options = [ |
| "--input=/etc/profile", |
| "--output=/tmp/py-wordcount-portable", |
| "--runner=PortableRunner", |
| "--experiments=worker_threads=100", |
| "--parallelism=2", |
| "--shutdown_sources_on_final_watermark", |
| "--environment_cache_millis=10000", |
| "--expansion_service_jar=${testServiceExpansionJar}", |
| ] |
| exec { |
| executable 'sh' |
| args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}" |
| // TODO: Check that the output file is generated and runs. |
| } |
| } |
| } |
| |
| project.task('crossLanguageTests') { |
| dependsOn "crossLanguagePythonJavaFlink" |
| dependsOn "crossLanguagePortableWordCount" |
| } |