blob: bce540489463bed474b4b6ae8126aee9a02f6d8f [file] [log] [blame]
import groovy.json.JsonOutput
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.runners.portability')
applyPythonNature()
description = "Apache Beam :: Runners :: Portability :: Java"
ext.summary = """A Java implementation of the Beam Model which utilizes the portability
framework to execute user-definied functions."""
configurations {
validatesRunner
}
dependencies {
compile library.java.vendored_guava_26_0_jre
compile project(":runners:java-fn-execution")
compile project(":runners:java-job-service")
compile project(path: ":sdks:java:harness", configuration: "shadow")
compile library.java.vendored_grpc_1_26_0
compile library.java.slf4j_api
compile library.java.joda_time
compile "org.hamcrest:hamcrest:2.1"
compile project(path: ":model:fn-execution", configuration: "shadow")
compile project(path: ":model:job-management:", configuration: "shadow")
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(":runners:core-construction-java")
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:fn-execution")
testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.slf4j_jdk14
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(path: project.path, configuration: "testRuntime")
}
project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(":runners:core-java")
ext.pythonSdkDir = project.findProject(":sdks:python").getProjectDir().getAbsoluteFile().toString()
// If this is set via -P then we assume it is already running and will not start it
def needsLocalJobService = !project.hasProperty("localJobServicePortFile")
ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
void execInVirtualenv(String... args) {
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
exec {
workingDir project.ext.pythonSdkDir
commandLine "sh", "-c", shellCommand
}
}
// Does not background the process, but allows the process to daemonize itself
void execBackgroundInVirtualenv(String... args) {
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(project.ext.pythonSdkDir)).command(["sh", "-c", shellCommand])
Process proc = pb.start();
// redirectIO does not work for connecting to groovy/gradle stdout
BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line
while ((line = reader.readLine()) != null) {
println line
}
int exitCode = proc.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Local job service startup failed with exit code ${exitCode}")
}
}
task installBeamPythonInVirtualenv {
dependsOn setupVirtualenv
doLast {
execInVirtualenv "pip", "install", "-e", "."
}
}
task startLocalJobService {
dependsOn installBeamPythonInVirtualenv
doLast {
execBackgroundInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--background",
"--stdout_file=${localJobServiceStdoutFile}",
"--pid_file=${localJobServicePidFile}",
"--port_file=${localJobServicePortFile}"
}
}
task stopLocalJobService {
doLast {
execInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--stop",
"--pid_file=${localJobServicePidFile}"
}
}
startLocalJobService.finalizedBy stopLocalJobService
/**
* Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
* with the specified environment type.
*/
def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PortableRunner Java docker ValidatesRunner suite"
classpath = configurations.validatesRunner
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestUniversalRunner",
"--experiments=beam_fn_api",
"--defaultEnvironmentType=${environmentType}",
"--localJobServicePortFile=${localJobServicePortFile}"
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
}
filter {
// There is not currently a category for excluding these _only_ in committed mode
// https://issues.apache.org/jira/browse/BEAM-10445
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics'
// https://issues.apache.org/jira/browse/BEAM-10446
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics'
// Teardown not called in exceptions
// https://issues.apache.org/jira/browse/BEAM-10447
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'
// Only known window fns supported, not general window merging
// https://issues.apache.org/jira/browse/BEAM-10448
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing'
excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows'
// Misc failures
// https://issues.apache.org/jira/browse/BEAM-10451
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext'
// https://issues.apache.org/jira/browse/BEAM-10454
excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo'
// https://issues.apache.org/jira/browse/BEAM-10452
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
// https://issues.apache.org/jira/browse/BEAM-10995
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation'
}
}
if (environmentType == "DOCKER") {
if(dockerImageTask.isEmpty()) {
throw new StopExecutionException("dockerImageTask is required for a Docker environment test task");
}
vrTask.dependsOn dockerImageTask
}
if (needsLocalJobService) {
vrTask.dependsOn startLocalJobService
stopLocalJobService.mustRunAfter vrTask
}
return vrTask
}
task ulrDockerValidatesRunner {
dependsOn createUlrValidatesRunnerTask("ulrDockerValidatesRunnerTests", "DOCKER", ":sdks:java:container:java8:docker")
}
task ulrLoopbackValidatesRunner {
dependsOn createUlrValidatesRunnerTask("ulrLoopbackValidatesRunnerTests", "LOOPBACK")
}