| import groovy.json.JsonOutput |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| plugins { id 'org.apache.beam.module' } |
| applyJavaNature(automaticModuleName: 'org.apache.beam.runners.portability') |
| applyPythonNature() |
| |
| description = "Apache Beam :: Runners :: Portability :: Java" |
| ext.summary = """A Java implementation of the Beam Model which utilizes the portability |
| framework to execute user-definied functions.""" |
| |
| configurations { |
| validatesRunner |
| } |
| |
| dependencies { |
| compile library.java.vendored_guava_26_0_jre |
| compile library.java.hamcrest_library |
| compile project(":runners:java-fn-execution") |
| compile project(":runners:java-job-service") |
| compile project(path: ":sdks:java:harness", configuration: "shadow") |
| compile library.java.vendored_grpc_1_26_0 |
| compile library.java.slf4j_api |
| |
| testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime") |
| testCompile library.java.hamcrest_core |
| testCompile library.java.junit |
| testCompile library.java.mockito_core |
| testCompile library.java.slf4j_jdk14 |
| |
| validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") |
| validatesRunner project(path: project.path, configuration: "testRuntime") |
| } |
| |
| project.evaluationDependsOn(":sdks:java:core") |
| project.evaluationDependsOn(":sdks:python") |
| project.evaluationDependsOn(":runners:core-java") |
| |
| ext.pythonSdkDir = project.findProject(":sdks:python").getProjectDir().getAbsoluteFile().toString() |
| |
| // If this is set via -P then we assume it is already running and will not start it |
| def needsLocalJobService = !project.hasProperty("localJobServicePortFile") |
| |
| ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port" |
| |
| ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout" |
| ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid" |
| |
| void execInVirtualenv(String... args) { |
| String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") |
| exec { |
| workingDir project.ext.pythonSdkDir |
| commandLine "sh", "-c", shellCommand |
| } |
| } |
| |
| // Does not background the process, but allows the process to daemonize itself |
| void execBackgroundInVirtualenv(String... args) { |
| String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") |
| ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(project.ext.pythonSdkDir)).command(["sh", "-c", shellCommand]) |
| Process proc = pb.start(); |
| |
| // redirectIO does not work for connecting to groovy/gradle stdout |
| BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); |
| String line |
| while ((line = reader.readLine()) != null) { |
| println line |
| } |
| int exitCode = proc.waitFor(); |
| if (exitCode != 0) { |
| throw new RuntimeException("Local job service startup failed with exit code ${exitCode}") |
| } |
| } |
| |
| task installBeamPythonInVirtualenv { |
| dependsOn setupVirtualenv |
| doLast { |
| execInVirtualenv "pip", "install", "-e", "." |
| } |
| } |
| |
| task startLocalJobService { |
| dependsOn installBeamPythonInVirtualenv |
| |
| doLast { |
| execBackgroundInVirtualenv "python", |
| "-m", "apache_beam.runners.portability.local_job_service_main", |
| "--background", |
| "--stdout_file=${localJobServiceStdoutFile}", |
| "--pid_file=${localJobServicePidFile}", |
| "--port_file=${localJobServicePortFile}" |
| } |
| } |
| |
| task stopLocalJobService { |
| doLast { |
| execInVirtualenv "python", |
| "-m", "apache_beam.runners.portability.local_job_service_main", |
| "--stop", |
| "--pid_file=${localJobServicePidFile}" |
| } |
| } |
| |
| startLocalJobService.finalizedBy stopLocalJobService |
| |
| /** |
| * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main |
| * with the specified environment type. |
| */ |
| def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" -> |
| Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { |
| description "PortableRunner Java docker ValidatesRunner suite" |
| classpath = configurations.validatesRunner |
| systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ |
| "--runner=TestUniversalRunner", |
| "--experiments=beam_fn_api", |
| "--defaultEnvironmentType=${environmentType}", |
| "--localJobServicePortFile=${localJobServicePortFile}" |
| ]) |
| testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' |
| } |
| filter { |
| // There is not currently a category for excluding these _only_ in committed mode |
| // https://issues.apache.org/jira/browse/BEAM-10445 |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics' |
| // https://issues.apache.org/jira/browse/BEAM-10446 |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics' |
| |
| // Teardown not called in exceptions |
| // https://issues.apache.org/jira/browse/BEAM-10447 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' |
| |
| // Only known window fns supported, not general window merging |
| // https://issues.apache.org/jira/browse/BEAM-10448 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows' |
| |
| // Misc failures |
| // https://issues.apache.org/jira/browse/BEAM-10451 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' |
| |
| // https://issues.apache.org/jira/browse/BEAM-10454 |
| excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo' |
| |
| // https://issues.apache.org/jira/browse/BEAM-10452 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode' |
| |
| // https://issues.apache.org/jira/browse/BEAM-10995 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation' |
| } |
| } |
| |
| if (environmentType == "DOCKER") { |
| if(dockerImageTask.isEmpty()) { |
| throw new StopExecutionException("dockerImageTask is required for a Docker environment test task"); |
| } |
| vrTask.dependsOn dockerImageTask |
| } |
| |
| if (needsLocalJobService) { |
| vrTask.dependsOn startLocalJobService |
| stopLocalJobService.mustRunAfter vrTask |
| } |
| |
| return vrTask |
| } |
| |
| task ulrDockerValidatesRunner { |
| dependsOn createUlrValidatesRunnerTask("ulrDockerValidatesRunnerTests", "DOCKER", ":sdks:java:container:java8:docker") |
| } |
| |
| task ulrLoopbackValidatesRunner { |
| dependsOn createUlrValidatesRunnerTask("ulrLoopbackValidatesRunnerTests", "LOOPBACK") |
| } |
| |