blob: 7bd39844a7a93005862520dbff4ccbbf2b128e53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.tools.ant.taskdefs.condition.Os
plugins { id 'org.apache.beam.module' }
applyPythonNature()
enablePythonPerformanceTest()
/*************************************************************************************************/
// Basic build and Python environment setup/cleanup
task buildPython(dependsOn: 'setupVirtualenv') {
doLast {
println 'Building Python Dependencies'
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && python setup.py build --build-base ${project.buildDir}"
}
}
}
build.dependsOn buildPython
/*************************************************************************************************/
// Unit tests for Python 2
// See Python 3 tests in test-suites/tox
task lint {}
check.dependsOn lint
toxTask "lintPy27", "py27-lint"
lint.dependsOn lintPy27
toxTask "lintPy27_3", "py27-lint3"
lint.dependsOn lintPy27_3
toxTask "testPy2Gcp", "py27-gcp"
test.dependsOn testPy2Gcp
toxTask "testPython2", "py27"
test.dependsOn testPython2
toxTask "testPy2Cython", "py27-cython"
test.dependsOn testPy2Cython
// Ensure that testPy2Cython runs exclusively to other tests. This line is not
// actually required, since gradle doesn't do parallel execution within a
// project.
testPy2Cython.mustRunAfter testPython2, testPy2Gcp
toxTask "docs", "docs"
assemble.dependsOn docs
toxTask "cover", "cover"
task preCommitPy2() {
dependsOn "docs"
dependsOn "testPy2Cython"
dependsOn "testPython2"
dependsOn "testPy2Gcp"
dependsOn "lint"
}
task portablePreCommit() {
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn portableWordCountTask('portableWordCountBatch', false)
dependsOn portableWordCountTask('portableWordCountStreaming', true)
}
/*************************************************************************************************/
// E2E integration testing and validates runner testing
// Basic test options for ITs running on Jenkins.
def basicTestOpts = [
"--nocapture", // print stdout instantly
"--processes=8", // run tests in parallel
"--process-timeout=4500", // timeout of whole command execution
]
task directRunnerIT(dependsOn: 'installGcpTest') {
// Run IT tests with TestDirectRunner in batch.
doLast {
def tests = [
"apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
"apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
"apache_beam.io.gcp.bigquery_io_read_it_test",
"apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
]
def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",
"test_opts": batchTestOpts]
def batchCmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $batchCmdArgs"
}
}
// Run IT tests with TestDirectRunner in streaming.
doLast {
def tests = [
"apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
]
def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",
"streaming": "true",
"test_opts": streamingTestOpts]
def streamingCmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $streamingCmdArgs"
}
}
}
// Before running this, you need to:
//
// 1. Build the SDK container:
//
// ./gradlew -p sdks/python/container docker
//
// 2. Either a) or b)
// a) If you want the Job Server to run in a Docker container:
//
// ./gradlew :runners:flink:1.5:job-server-container:docker
//
// b) Otherwise, start a local JobService, for example, the Portable Flink runner
// (in a separate shell since it continues to run):
//
// ./gradlew :runners:flink:1.5:job-server:runShadow
//
// Then you can run this example:
//
// Docker (2a):
//
// ./gradlew :sdks:python:portableWordCount
//
// Local JobService (2b):
//
// ./gradlew :sdks:python:portableWordCount -PjobEndpoint=localhost:8099
//
task portableWordCount {
dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming"))
}
def portableWordCountTask(name, streaming) {
tasks.create(name) {
dependsOn = ['installGcpTest']
mustRunAfter = [':runners:flink:1.5:job-server-container:docker', ':sdks:python:container:docker']
doLast {
// TODO: Figure out GCS credentials and use real GCS input and output.
def options = [
"--input=/etc/profile",
"--output=/tmp/py-wordcount-direct",
"--runner=PortableRunner",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
]
if (streaming)
options += ["--streaming"]
else
// workaround for local file output in docker container
options += ["--environment_cache_millis=10000"]
if (project.hasProperty("jobEndpoint"))
options += ["--job_endpoint=${project.property('jobEndpoint')}"]
if (project.hasProperty("environmentType")) {
options += ["--environment_type=${project.property('environmentType')}"]
}
if (project.hasProperty("environmentConfig")) {
options += ["--environment_config=${project.property('environmentConfig')}"]
}
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount ${options.join(' ')}"
// TODO: Check that the output file is generated and runs.
}
}
}
}
// Run PostCommit integration tests on default runner (TestDataflowRunner)
task postCommitIT(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
def testOpts = basicTestOpts + ["--attr=IT"]
def cmdArgs = project.mapToArgString(["test_opts": testOpts,
"worker_jar": dataflowWorkerJar])
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}
task validatesRunnerBatchTests(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
def testOpts = basicTestOpts + ["--attr=ValidatesRunner"]
def cmdArgs = project.mapToArgString(["test_opts": testOpts,
"worker_jar": dataflowWorkerJar])
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}
task validatesRunnerStreamingTests(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
// TODO(BEAM-3544,BEAM-5025): Disable tests with 'sickbay-streaming' tag.
def testOpts = basicTestOpts + ["--attr=ValidatesRunner,!sickbay-streaming"]
def argMap = ["test_opts": testOpts,
"streaming": "true",
"worker_jar": dataflowWorkerJar]
def cmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}
task hdfsIntegrationTest(dependsOn: 'installGcpTest') {
doLast {
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh"
}
}
}
task sparkValidatesRunner() {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':beam-runners-spark-job-server:shadowJar'
doLast {
def environment_config = "'{\"command\": \"${project(":beam-sdks-python:").buildDir.absolutePath}/sdk_worker.sh\"}'"
def argMap = [
"environment_type" : "PROCESS",
"spark_job_server_jar": project(":beam-runners-spark-job-server:").shadowJar.archivePath,
"environment_config": environment_config,
]
def argString = project.mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :beam-sdks-python:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?
project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString"
}
}
}
class CompatibilityMatrixConfig {
// Execute batch or streaming pipelines.
boolean streaming = false
// Execute on Docker or Process based environment.
SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER
enum SDK_WORKER_TYPE {
DOCKER, PROCESS, LOOPBACK
}
// Whether to pre-optimize the pipeline with the Python optimizer.
boolean preOptimize = false
}
def flinkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType = config.workerType.name()
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${project(":sdks:python").buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
def extra_experiments = []
if (config.preOptimize)
extra_experiments.add('pre_optimize=all')
tasks.create(name: name) {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server:shadowJar'
if (workerType.toLowerCase() == 'docker')
dependsOn ':sdks:python:container:docker'
else if (workerType.toLowerCase() == 'process')
dependsOn 'createProcessWorker'
doLast {
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.5:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}"
}
}
}
}
task flinkCompatibilityMatrixDocker() {
dependsOn flinkCompatibilityMatrix(streaming: false)
dependsOn flinkCompatibilityMatrix(streaming: true)
}
task flinkCompatibilityMatrixProcess() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
}
task flinkCompatibilityMatrixLoopback() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true)
}
task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLoopback'
}
// Run Python ValidatesRunner tests using the Java ReferenceRunner as a job server and Docker as
// the SDK environment.
task javaReferenceRunnerValidatesRunner() {
dependsOn 'setupVirtualenv'
dependsOn ':runners:reference:job-server:shadowJar'
dependsOn ':sdks:python:container:docker'
doLast {
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.java_reference_runner_test --job_server_jar=${project(":runners:reference:job-server:").shadowJar.archivePath} --environment_type=DOCKER"
}
}
}
task postCommit() {
dependsOn "crossLanguageTests"
dependsOn "directRunnerIT"
dependsOn "hdfsIntegrationTest"
dependsOn "postCommitIT"
}
/*************************************************************************************************/
// Other build and analysis tasks
// Snapshot of dependency requirements defined in setup.py.
// Results will be stored in files under Gradle build directory.
task depSnapshot(dependsOn: 'installGcpTest') {
doLast {
println 'Snapshoting full dependencies requirements with versions info to requirements.txt.'
exec {
// Remove useless item "pkg-resources" from file which is introduced by a bug in Ubuntu.
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip freeze --local --all | grep -v \"pkg-resources\" > ${project.buildDir}/requirements.txt"
}
}
}
task dependencyUpdates(dependsOn: ':dependencyUpdates') {
doLast {
exec {
executable 'sh'
args '-c', "./scripts/run_dependency_check.sh"
}
}
}
task buildSnapshot() {
dependsOn 'sdist'
dependsOn 'depSnapshot'
}
project.task('createProcessWorker') {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${project.buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${project.ext.envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${project.ext.envdir}/bin/activate && cd ${project.projectDir} && python setup.py install ")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
project.task('crossLanguagePythonJavaFlink') {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath
def options = [
"--runner=PortableRunner",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--environment_cache_millis=10000",
"--expansion_service_port=8096",
"--expansion_service_jar=${testServiceExpansionJar}",
]
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}"
}
}
}
project.task('crossLanguagePortableWordCount') {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath
def options = [
"--input=/etc/profile",
"--output=/tmp/py-wordcount-portable",
"--runner=PortableRunner",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--environment_cache_millis=10000",
"--expansion_service_jar=${testServiceExpansionJar}",
]
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}"
// TODO: Check that the output file is generated and runs.
}
}
}
project.task('crossLanguageTests') {
dependsOn "crossLanguagePythonJavaFlink"
dependsOn "crossLanguagePortableWordCount"
}