plugins { id 'org.apache.beam.module' }
// Basic build and Python environment setup/cleanup
task buildPython(dependsOn: 'setupVirtualenv') {
doLast {
println 'Building Python Dependencies'
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && python build --build-base ${project.buildDir}"
build.dependsOn buildPython
// Unit tests for Python 2
// See Python 3 tests in test-suites/tox
task lint {}
check.dependsOn lint
toxTask "lintPy27", "py27-lint"
lint.dependsOn lintPy27
toxTask "lintPy27_3", "py27-lint3"
lint.dependsOn lintPy27_3
toxTask "testPy2Gcp", "py27-gcp"
test.dependsOn testPy2Gcp
toxTask "testPython2", "py27"
test.dependsOn testPython2
toxTask "testPy2Cython", "py27-cython"
test.dependsOn testPy2Cython
// Ensure that testPy2Cython runs exclusively to other tests. This line is not
// actually required, since gradle doesn't do parallel execution within a
// project.
testPy2Cython.mustRunAfter testPython2, testPy2Gcp
toxTask "docs", "docs"
assemble.dependsOn docs
toxTask "cover", "cover"
task preCommitPy2() {
dependsOn "docs"
dependsOn "testPy2Cython"
dependsOn "testPython2"
dependsOn "testPy2Gcp"
dependsOn "lint"
task portablePreCommitPy2() {
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn portableWordCountBatch
dependsOn portableWordCountStreaming
// E2E integration testing and validates runner testing
// Basic test options for ITs running on Jenkins.
def basicTestOpts = [
"--nocapture", // print stdout instantly
"--processes=8", // run tests in parallel
"--process-timeout=4500", // timeout of whole command execution
task directRunnerIT(dependsOn: 'installGcpTest') {
// Run IT tests with TestDirectRunner in batch.
doLast {
def tests = [
def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",
"test_opts": batchTestOpts]
def batchCmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/ $batchCmdArgs"
// Run IT tests with TestDirectRunner in streaming.
doLast {
def tests = [
def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",
"streaming": "true",
"test_opts": streamingTestOpts]
def streamingCmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/ $streamingCmdArgs"
// Before running this, you need to:
// 1. Build the SDK container:
// ./gradlew -p sdks/python/container docker
// 2. Either a) or b)
// a) If you want the Job Server to run in a Docker container:
// ./gradlew :runners:flink:1.5:job-server-container:docker
// b) Otherwise, start a local JobService, for example, the Portable Flink runner
// (in a separate shell since it continues to run):
// ./gradlew :runners:flink:1.5:job-server:runShadow
// Then you can run this example:
// Docker (2a):
// ./gradlew :sdks:python:portableWordCount
// Local JobService (2b):
// ./gradlew :sdks:python:portableWordCount -PjobEndpoint=localhost:8099
task portableWordCount {
dependsOn project.hasProperty("streaming") ? portableWordCountStreaming : portableWordCountBatch
// Run PostCommit integration tests on default runner (TestDataflowRunner)
task postCommitIT(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
def testOpts = basicTestOpts + ["--attr=IT"]
def cmdArgs = project.mapToArgString(["test_opts": testOpts,
"worker_jar": dataflowWorkerJar])
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/ $cmdArgs"
task validatesRunnerBatchTests(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
def testOpts = basicTestOpts + ["--attr=ValidatesRunner"]
def cmdArgs = project.mapToArgString(["test_opts": testOpts,
"worker_jar": dataflowWorkerJar])
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/ $cmdArgs"
task validatesRunnerStreamingTests(dependsOn: ['installGcpTest', 'sdist']) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
// TODO(BEAM-3544,BEAM-5025): Disable tests with 'sickbay-streaming' tag.
def testOpts = basicTestOpts + ["--attr=ValidatesRunner,!sickbay-streaming"]
def argMap = ["test_opts": testOpts,
"streaming": "true",
"worker_jar": dataflowWorkerJar]
def cmdArgs = project.mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./scripts/ $cmdArgs"
task hdfsIntegrationTest(dependsOn: 'installGcpTest') {
doLast {
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && ./apache_beam/io/hdfs_integration_test/"
task sparkValidatesRunner() {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':runners:spark:job-server:shadowJar'
doLast {
def environment_config = "'{\"command\": \"${project(":sdks:python:").buildDir.absolutePath}/\"}'"
def argMap = [
"environment_type" : "PROCESS",
"spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
"environment_config": environment_config,
def argString = project.mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :beam-sdks-python:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?'tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString"
class CompatibilityMatrixConfig {
// Execute batch or streaming pipelines.
boolean streaming = false
// Execute on Docker or Process based environment.
// Whether to pre-optimize the pipeline with the Python optimizer.
boolean preOptimize = false
def flinkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType =
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${project(":sdks:python").buildDir.absolutePath}/\"}'" : ""
def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
def extra_experiments = []
if (config.preOptimize)
tasks.create(name: name) {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server:shadowJar'
dependsOn ':sdks:java:container:docker'
if (workerType.toLowerCase() == 'docker')
dependsOn ':sdks:python:container:docker'
else if (workerType.toLowerCase() == 'process')
dependsOn 'createProcessWorker'
doLast {
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":runners:flink:1.5:job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}"
task flinkCompatibilityMatrixDocker() {
dependsOn flinkCompatibilityMatrix(streaming: false)
dependsOn flinkCompatibilityMatrix(streaming: true)
task flinkCompatibilityMatrixProcess() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
task flinkCompatibilityMatrixLoopback() {
dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true)
task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLoopback'
task postCommit() {
dependsOn "crossLanguageTests"
dependsOn "directRunnerIT"
dependsOn "hdfsIntegrationTest"
dependsOn "postCommitIT"
// Other build and analysis tasks
// Snapshot of dependency requirements defined in
// Results will be stored in files under Gradle build directory.
task depSnapshot(dependsOn: 'installGcpTest') {
doLast {
println 'Snapshoting full dependencies requirements with versions info to requirements.txt.'
exec {
// Remove useless item "pkg-resources" from file which is introduced by a bug in Ubuntu.
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip freeze --local --all | grep -v \"pkg-resources\" > ${project.buildDir}/requirements.txt"
task dependencyUpdates(dependsOn: ':dependencyUpdates') {
doLast {
exec {
executable 'sh'
args '-c', "./scripts/"
task buildSnapshot() {
dependsOn 'sdist'
dependsOn 'depSnapshot'
project.task('createProcessWorker') {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${project.buildDir}/")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${project.ext.envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${project.ext.envdir}/bin/activate && cd ${project.projectDir} && python install ")
exec {
commandLine('chmod', '+x', sdkWorkerFile)
project.task('crossLanguagePythonJavaFlink') {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath
def options = [
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}"
project.task('crossLanguagePortableWordCount') {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.5:job-server-container:docker'
dependsOn ':sdks:python:container:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':runners:core-construction-java:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":runners:core-construction-java:").buildTestExpansionServiceJar.archivePath
def options = [
exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}"
// TODO: Check that the output file is generated and runs.
project.task('crossLanguageTests') {
dependsOn "crossLanguagePythonJavaFlink"
dependsOn "crossLanguagePortableWordCount"