| import groovy.json.JsonOutput |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| plugins { id 'org.apache.beam.module' } |
| applyJavaNature( automaticModuleName: 'org.apache.beam.runners.portability') |
| applyPythonNature() |
| |
| description = "Apache Beam :: Runners :: Portability :: Java" |
| ext.summary = """A Java implementation of the Beam Model which utilizes the portability |
| framework to execute user-definied functions.""" |
| |
| configurations { |
| validatesRunner |
| } |
| |
| dependencies { |
| implementation project(path: ":model:job-management", configuration: "shadow") |
| implementation project(path: ":model:pipeline", configuration: "shadow") |
| implementation project(":runners:java-fn-execution") |
| implementation project(":runners:java-job-service") |
| implementation project(path: ":sdks:java:core", configuration: "shadow") |
| implementation project(path: ":sdks:java:harness", configuration: "shadow") |
| implementation library.java.hamcrest |
| permitUnusedDeclared library.java.hamcrest |
| implementation library.java.joda_time |
| implementation library.java.slf4j_api |
| implementation library.java.vendored_grpc_1_69_0 |
| implementation library.java.vendored_guava_32_1_2_jre |
| |
| testImplementation library.java.hamcrest |
| testImplementation library.java.junit |
| testImplementation library.java.mockito_core |
| testImplementation library.java.slf4j_jdk14 |
| |
| validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") |
| validatesRunner project(path: project.path, configuration: "testRuntimeMigration") |
| } |
| |
| // Set using -PjobEndpoint=, for example: -PjobEndpoint=localhost:8073 |
| def jobEndpointPropertyName = "jobEndpoint" |
| |
| project.evaluationDependsOn(":sdks:java:core") |
| project.evaluationDependsOn(":sdks:python") |
| project.evaluationDependsOn(":runners:core-java") |
| |
| ext.pythonSdkDir = project.findProject(":sdks:python").getProjectDir().getAbsoluteFile().toString() |
| |
| // If this is set via -P then we assume it is already running and will not start it |
| def needsLocalJobService = !project.hasProperty("localJobServicePortFile") && !project.hasProperty(jobEndpointPropertyName) |
| |
| ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port" |
| |
| ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout" |
| ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid" |
| |
| void execInVirtualenv(String... args) { |
| String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") |
| exec { |
| workingDir project.ext.pythonSdkDir |
| commandLine "sh", "-c", shellCommand |
| } |
| } |
| |
| // Does not background the process, but allows the process to daemonize itself |
| void execBackgroundInVirtualenv(String... args) { |
| String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") |
| ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(project.ext.pythonSdkDir)).command(["sh", "-c", shellCommand]) |
| Process proc = pb.start(); |
| |
| // redirectIO does not work for connecting to groovy/gradle stdout |
| BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); |
| String line |
| while ((line = reader.readLine()) != null) { |
| println line |
| } |
| int exitCode = proc.waitFor(); |
| if (exitCode != 0) { |
| throw new RuntimeException("Local job service startup failed with exit code ${exitCode}") |
| } |
| } |
| |
| def installBeamPythonInVirtualenv = tasks.register("installBeamPythonInVirtualenv") { |
| dependsOn setupVirtualenv |
| doLast { |
| execInVirtualenv "pip", "install", "-e", "." |
| } |
| } |
| |
| def startLocalJobService = tasks.register("startLocalJobService") { |
| dependsOn installBeamPythonInVirtualenv |
| |
| doLast { |
| execBackgroundInVirtualenv "python", |
| "-m", "apache_beam.runners.portability.local_job_service_main", |
| "--background", |
| "--stdout_file=${localJobServiceStdoutFile}", |
| "--pid_file=${localJobServicePidFile}", |
| "--port_file=${localJobServicePortFile}" |
| } |
| } |
| |
| task stopLocalJobService { |
| doLast { |
| execInVirtualenv "python", |
| "-m", "apache_beam.runners.portability.local_job_service_main", |
| "--stop", |
| "--pid_file=${localJobServicePidFile}" |
| } |
| } |
| |
| startLocalJobService.configure{finalizedBy stopLocalJobService} |
| |
| def sickbayTests = [ |
| |
| //TODO: https://github.com/apache/beam/issues/20631 |
| 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation', |
| ] |
| |
| /** |
| * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main |
| * with the specified environment type. |
| */ |
| def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" -> |
| Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { |
| description "PortableRunner Java docker ValidatesRunner suite" |
| classpath = configurations.validatesRunner |
| var jobEndpointOption = "--localJobServicePortFile=$localJobServicePortFile" |
| if (project.hasProperty(jobEndpointPropertyName)) { |
| jobEndpointOption = "--$jobEndpointPropertyName=${project.findProperty(jobEndpointPropertyName)}" |
| } |
| systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ |
| "--runner=TestUniversalRunner", |
| "--experiments=beam_fn_api", |
| "--defaultEnvironmentType=${environmentType}", |
| jobEndpointOption |
| ]) |
| testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' |
| } |
| filter { |
| // There is not currently a category for excluding these _only_ in committed mode |
| // https://github.com/apache/beam/issues/20370 |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics' |
| // https://github.com/apache/beam/issues/20371 |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics' |
| |
| // Teardown not called in exceptions |
| // https://github.com/apache/beam/issues/20372 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' |
| |
| // Only known window fns supported, not general window merging |
| // https://github.com/apache/beam/issues/20437 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows' |
| |
| // Misc failures |
| // https://github.com/apache/beam/issues/20375 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' |
| |
| // https://github.com/apache/beam/issues/20373 |
| excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo' |
| |
| // https://github.com/apache/beam/issues/20374 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode' |
| |
| // TODO(https://github.com/apache/beam/issues/29973) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' |
| // TODO(https://github.com/apache/beam/issues/31231) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' |
| |
| for (String test : sickbayTests) { |
| excludeTestsMatching test |
| } |
| |
| } |
| } |
| |
| if (environmentType == "DOCKER") { |
| if(dockerImageTask.isEmpty()) { |
| throw new StopExecutionException("dockerImageTask is required for a Docker environment test task"); |
| } |
| vrTask.dependsOn dockerImageTask |
| } |
| |
| if (needsLocalJobService) { |
| vrTask.dependsOn startLocalJobService |
| stopLocalJobService.mustRunAfter vrTask |
| } |
| |
| return vrTask |
| } |
| |
| tasks.register("validatesRunnerSickbay", Test) { |
| group = "Verification" |
| description "Validates Universal local runner (Sickbay Tests)" |
| systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ |
| "--runner=TestUniversalRunner", |
| ]) |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) |
| |
| filter { |
| for (String test : sickbayTests) { |
| includeTestsMatching test |
| } |
| } |
| } |
| |
| task ulrDockerValidatesRunner { |
| dependsOn createUlrValidatesRunnerTask("ulrDockerValidatesRunnerTests", "DOCKER", ":sdks:java:container:${project.ext.currentJavaVersion}:docker") |
| } |
| |
| task ulrLoopbackValidatesRunner { |
| dependsOn createUlrValidatesRunnerTask("ulrLoopbackValidatesRunnerTests", "LOOPBACK") |
| } |
| |