|  | import groovy.json.JsonOutput | 
|  |  | 
|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements.  See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership.  The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | plugins { id 'org.apache.beam.module' } | 
|  | applyJavaNature( automaticModuleName: 'org.apache.beam.runners.portability') | 
|  | applyPythonNature() | 
|  |  | 
|  | description = "Apache Beam :: Runners :: Portability :: Java" | 
|  | ext.summary = """A Java implementation of the Beam Model which utilizes the portability | 
|  | framework to execute user-definied functions.""" | 
|  |  | 
|  | configurations { | 
|  | validatesRunner | 
|  | } | 
|  |  | 
|  | dependencies { | 
|  | implementation project(path: ":model:job-management", configuration: "shadow") | 
|  | implementation project(path: ":model:pipeline", configuration: "shadow") | 
|  | implementation project(":runners:java-fn-execution") | 
|  | implementation project(":runners:java-job-service") | 
|  | implementation project(path: ":sdks:java:core", configuration: "shadow") | 
|  | implementation project(path: ":sdks:java:harness", configuration: "shadow") | 
|  | implementation library.java.hamcrest | 
|  | permitUnusedDeclared library.java.hamcrest | 
|  | implementation library.java.joda_time | 
|  | implementation library.java.slf4j_api | 
|  | implementation library.java.vendored_grpc_1_69_0 | 
|  | implementation library.java.vendored_guava_32_1_2_jre | 
|  |  | 
|  | testImplementation library.java.hamcrest | 
|  | testImplementation library.java.junit | 
|  | testImplementation library.java.mockito_core | 
|  | testImplementation library.java.slf4j_jdk14 | 
|  |  | 
|  | validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") | 
|  | validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") | 
|  | validatesRunner project(path: project.path, configuration: "testRuntimeMigration") | 
|  | } | 
|  |  | 
|  | // Set using -PjobEndpoint=, for example: -PjobEndpoint=localhost:8073 | 
|  | def jobEndpointPropertyName = "jobEndpoint" | 
|  |  | 
|  | project.evaluationDependsOn(":sdks:java:core") | 
|  | project.evaluationDependsOn(":sdks:python") | 
|  | project.evaluationDependsOn(":runners:core-java") | 
|  |  | 
|  | ext.pythonSdkDir = project.findProject(":sdks:python").getProjectDir().getAbsoluteFile().toString() | 
|  |  | 
|  | // If this is set via -P then we assume it is already running and will not start it | 
|  | def needsLocalJobService = !project.hasProperty("localJobServicePortFile") && !project.hasProperty(jobEndpointPropertyName) | 
|  |  | 
|  | ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port" | 
|  |  | 
|  | ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout" | 
|  | ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid" | 
|  |  | 
|  | void execInVirtualenv(String... args) { | 
|  | String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") | 
|  | exec { | 
|  | workingDir project.ext.pythonSdkDir | 
|  | commandLine "sh", "-c", shellCommand | 
|  | } | 
|  | } | 
|  |  | 
|  | // Does not background the process, but allows the process to daemonize itself | 
|  | void execBackgroundInVirtualenv(String... args) { | 
|  | String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") | 
|  | ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(project.ext.pythonSdkDir)).command(["sh", "-c", shellCommand]) | 
|  | Process proc = pb.start(); | 
|  |  | 
|  | // redirectIO does not work for connecting to groovy/gradle stdout | 
|  | BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); | 
|  | String line | 
|  | while ((line = reader.readLine()) != null) { | 
|  | println line | 
|  | } | 
|  | int exitCode = proc.waitFor(); | 
|  | if (exitCode != 0) { | 
|  | throw new RuntimeException("Local job service startup failed with exit code ${exitCode}") | 
|  | } | 
|  | } | 
|  |  | 
|  | def installBeamPythonInVirtualenv = tasks.register("installBeamPythonInVirtualenv") { | 
|  | dependsOn setupVirtualenv | 
|  | doLast { | 
|  | execInVirtualenv "pip", "install", "-e", "." | 
|  | } | 
|  | } | 
|  |  | 
|  | def startLocalJobService = tasks.register("startLocalJobService") { | 
|  | dependsOn installBeamPythonInVirtualenv | 
|  |  | 
|  | doLast { | 
|  | execBackgroundInVirtualenv "python", | 
|  | "-m", "apache_beam.runners.portability.local_job_service_main", | 
|  | "--background", | 
|  | "--stdout_file=${localJobServiceStdoutFile}", | 
|  | "--pid_file=${localJobServicePidFile}", | 
|  | "--port_file=${localJobServicePortFile}" | 
|  | } | 
|  | } | 
|  |  | 
|  | task stopLocalJobService { | 
|  | doLast { | 
|  | execInVirtualenv "python", | 
|  | "-m", "apache_beam.runners.portability.local_job_service_main", | 
|  | "--stop", | 
|  | "--pid_file=${localJobServicePidFile}" | 
|  | } | 
|  | } | 
|  |  | 
|  | startLocalJobService.configure{finalizedBy stopLocalJobService} | 
|  |  | 
|  | def sickbayTests = [ | 
|  |  | 
|  | //TODO: https://github.com/apache/beam/issues/20631 | 
|  | 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation', | 
|  | ] | 
|  |  | 
|  | /** | 
|  | * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main | 
|  | * with the specified environment type. | 
|  | */ | 
|  | def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" -> | 
|  | Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { | 
|  | description "PortableRunner Java docker ValidatesRunner suite" | 
|  | classpath = configurations.validatesRunner | 
|  | var jobEndpointOption = "--localJobServicePortFile=$localJobServicePortFile" | 
|  | if (project.hasProperty(jobEndpointPropertyName)) { | 
|  | jobEndpointOption = "--$jobEndpointPropertyName=${project.findProperty(jobEndpointPropertyName)}" | 
|  | } | 
|  | systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ | 
|  | "--runner=TestUniversalRunner", | 
|  | "--experiments=beam_fn_api", | 
|  | "--defaultEnvironmentType=${environmentType}", | 
|  | jobEndpointOption | 
|  | ]) | 
|  | testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) | 
|  | useJUnit { | 
|  | includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' | 
|  | // Should be run only in a properly configured SDK harness environment | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' | 
|  | excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' | 
|  | } | 
|  | filter { | 
|  | // There is not currently a category for excluding these _only_ in committed mode | 
|  | // https://github.com/apache/beam/issues/20370 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics' | 
|  | // https://github.com/apache/beam/issues/20371 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics' | 
|  |  | 
|  | // Teardown not called in exceptions | 
|  | // https://github.com/apache/beam/issues/20372 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' | 
|  |  | 
|  | // Only known window fns supported, not general window merging | 
|  | // https://github.com/apache/beam/issues/20437 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows' | 
|  |  | 
|  | // Misc failures | 
|  | // https://github.com/apache/beam/issues/20375 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine' | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' | 
|  |  | 
|  | // https://github.com/apache/beam/issues/20373 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo' | 
|  |  | 
|  | // https://github.com/apache/beam/issues/20374 | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode' | 
|  |  | 
|  | // TODO(https://github.com/apache/beam/issues/29973) | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' | 
|  | // TODO(https://github.com/apache/beam/issues/31231) | 
|  | excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' | 
|  |  | 
|  | for (String test : sickbayTests) { | 
|  | excludeTestsMatching test | 
|  | } | 
|  |  | 
|  | } | 
|  | } | 
|  |  | 
|  | if (environmentType == "DOCKER") { | 
|  | if(dockerImageTask.isEmpty()) { | 
|  | throw new StopExecutionException("dockerImageTask is required for a Docker environment test task"); | 
|  | } | 
|  | vrTask.dependsOn dockerImageTask | 
|  | } | 
|  |  | 
|  | if (needsLocalJobService) { | 
|  | vrTask.dependsOn startLocalJobService | 
|  | stopLocalJobService.mustRunAfter vrTask | 
|  | } | 
|  |  | 
|  | return vrTask | 
|  | } | 
|  |  | 
|  | tasks.register("validatesRunnerSickbay", Test) { | 
|  | group = "Verification" | 
|  | description "Validates Universal local runner (Sickbay Tests)" | 
|  | systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ | 
|  | "--runner=TestUniversalRunner", | 
|  | ]) | 
|  |  | 
|  | classpath = configurations.validatesRunner | 
|  | testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) | 
|  |  | 
|  | filter { | 
|  | for (String test : sickbayTests) { | 
|  | includeTestsMatching test | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | task ulrDockerValidatesRunner { | 
|  | dependsOn createUlrValidatesRunnerTask("ulrDockerValidatesRunnerTests", "DOCKER", ":sdks:java:container:${project.ext.currentJavaVersion}:docker") | 
|  | } | 
|  |  | 
|  | task ulrLoopbackValidatesRunner { | 
|  | dependsOn createUlrValidatesRunnerTask("ulrLoopbackValidatesRunnerTests", "LOOPBACK") | 
|  | } | 
|  |  |