blob: 5ceac52d35ba918cc86f814811f077b21951f8f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.tools.ant.taskdefs.condition.Os
apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyPythonNature()
def pythonRootDir = "${rootDir}/sdks/python"
/*************************************************************************************************/
addPortableWordCountTasks()
task preCommitPy2() {
dependsOn ':runners:flink:1.8:job-server-container:docker'
dependsOn ':sdks:python:container:py2:docker'
dependsOn portableWordCountBatch
dependsOn portableWordCountStreaming
}
// TODO: Move the rest of this file into ../common.gradle.
// Before running this, you need to:
//
// 1. Build the SDK container:
//
// ./gradlew -p sdks/python/container buildAll
//
// 2. Either a) or b)
// a) If you want the Job Server to run in a Docker container:
//
// ./gradlew :runners:flink:1.8:job-server-container:docker
//
// b) Otherwise, start a local JobService, for example, the Portable Flink runner
// (in a separate shell since it continues to run):
//
// ./gradlew :runners:flink:1.8:job-server:runShadow
//
// Then you can run this example:
//
// Docker (2a):
//
// ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount
//
// Local JobService (2b):
//
// ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount -PjobEndpoint=localhost:8099
//
task portableWordCount {
dependsOn project.hasProperty("streaming") ? portableWordCountStreaming : portableWordCountBatch
}
/*************************************************************************************************/
task crossLanguagePythonJavaDirect {
dependsOn 'setupVirtualenv'
dependsOn ':sdks:java:container:docker'
dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar'
doLast {
def options = [
"--expansion_service_target=sdks:java:testing:expansion-service:buildTestExpansionServiceJar",
"--expansion_service_target_appendix=testExpansionService",
]
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}"
}
}
}
task crossLanguagePythonJavaFlink {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.8:job-server-container:docker'
dependsOn ':sdks:python:container:py2:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":sdks:java:testing:expansion-service:").buildTestExpansionServiceJar.archivePath
def options = [
"--runner=PortableRunner",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--environment_cache_millis=10000",
"--expansion_service_port=8096",
"--expansion_service_jar=${testServiceExpansionJar}",
]
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.transforms.external_test ${options.join(' ')}"
}
}
}
task crossLanguagePortableWordCount {
dependsOn 'setupVirtualenv'
dependsOn ':runners:flink:1.8:job-server-container:docker'
dependsOn ':sdks:python:container:py2:docker'
dependsOn ':sdks:java:container:docker'
dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar'
doLast {
def testServiceExpansionJar = project(":sdks:java:testing:expansion-service:").buildTestExpansionServiceJar.archivePath
def options = [
"--input=/etc/profile",
"--output=/tmp/py-wordcount-portable",
"--runner=PortableRunner",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--environment_cache_millis=10000",
"--expansion_service_jar=${testServiceExpansionJar}",
]
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}"
// TODO: Check that the output file is generated and runs.
}
}
}
task crossLanguageTests {
dependsOn "crossLanguagePythonJavaFlink"
dependsOn "crossLanguagePortableWordCount"
}
/*************************************************************************************************/
task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
task sparkValidatesRunner() {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':runners:spark:job-server:shadowJar'
doLast {
def environment_config = "'{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'"
def argMap = [
"environment_type" : "PROCESS",
"spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
"environment_config": environment_config,
]
def argString = mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?
project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString"
}
}
}
apply from: "../common.gradle"