| import org.apache.beam.gradle.BeamModulePlugin |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| apply plugin: 'org.apache.beam.module' |
| apply plugin: 'application' |
| // we need to set mainClassName before applying shadow plugin |
| mainClassName = "org.apache.beam.runners.samza.SamzaJobServerDriver" |
| |
| applyJavaNature( |
| automaticModuleName: 'org.apache.beam.runners.samza.jobserver', |
| archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName, |
| validateShadowJar: false, |
| exportJavadoc: false, |
| shadowClosure: { |
| append "reference.conf" |
| }, |
| ) |
| |
| def samzaRunnerProject = project.parent.path |
| |
| description = "Apache Beam :: Runners :: Samza :: Job Server" |
| |
| configurations { |
| validatesPortableRunner |
| } |
| |
| dependencies { |
| implementation project(samzaRunnerProject) |
| permitUnusedDeclared project(samzaRunnerProject) |
| runtimeOnly group: "org.slf4j", name: "jcl-over-slf4j", version: dependencies.create(project.library.java.slf4j_api).getVersion() |
| validatesPortableRunner project(path: samzaRunnerProject, configuration: "testRuntimeMigration") |
| validatesPortableRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesPortableRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") |
| validatesPortableRunner project(path: ":runners:portability:java", configuration: "testRuntimeMigration") |
| runtimeOnly library.java.slf4j_simple |
| } |
| |
| runShadow { |
| args = [] |
| } |
| |
| def portableValidatesRunnerTask(String name, boolean docker) { |
| def tempDir = File.createTempDir() |
| def pipelineOptions = [ |
| "--configOverride={\"job.non-logged.store.base.dir\":\"" + tempDir + "\"}" |
| ] |
| createPortableValidatesRunnerTask( |
| name: "validatesPortableRunner${name}", |
| jobServerDriver: "org.apache.beam.runners.samza.SamzaJobServerDriver", |
| jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0", |
| testClasspathConfiguration: configurations.validatesPortableRunner, |
| numParallelTests: 1, |
| pipelineOpts: pipelineOptions, |
| environment: docker ? BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.DOCKER : BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED, |
| testCategories: { |
| if (docker) { |
| includeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| return |
| } |
| // TODO(https://github.com/apache/beam/issues/22657) |
| // includeCategories 'org.apache.beam.sdk.testing.NeedsRunner' |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| // TODO: BEAM-12350 |
| excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics' |
| // TODO: BEAM-12681 |
| excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' |
| // Larger keys are possible, but they require more memory. |
| excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' |
| // TODO(https://github.com/apache/beam/issues/21023) |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' |
| }, |
| testFilter: { |
| // TODO(https://github.com/apache/beam/issues/21042) |
| excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2" |
| excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput" |
| excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo" |
| excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty" |
| // TODO(BEAM-10025) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded' |
| // TODO(https://github.com/apache/beam/issues/20703) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp' |
| // TODO(https://github.com/apache/beam/issues/20847) |
| excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' |
| // TODO(https://github.com/apache/beam/issues/20846) |
| excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' |
| // TODO(https://github.com/apache/beam/issues/21142) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testWindowFnPostMerging' |
| // TODO(https://github.com/apache/beam/issues/21143) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalid' |
| // TODO(https://github.com/apache/beam/issues/21144) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalidZeroAllowed' |
| // TODO(https://github.com/apache/beam/issues/32520) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionIn*Stateful' |
| // TODO(https://github.com/apache/beam/issues/21145) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.DeduplicateTest.testEventTime' |
| // TODO(https://github.com/apache/beam/issues/21146) |
| excludeTestsMatching 'org.apache.beam.sdk.io.TFRecordIOTest.testReadInvalidRecord' |
| // TODO(https://github.com/apache/beam/issues/21147) |
| excludeTestsMatching 'org.apache.beam.sdk.io.TFRecordIOTest.testReadInvalidDataMask' |
| // TODO(https://github.com/apache/beam/issues/21148) |
| excludeTestsMatching 'org.apache.beam.sdk.io.TFRecordIOTest.testReadInvalidLengthMask' |
| // TODO(https://github.com/apache/beam/issues/21149) |
| excludeTestsMatching 'org.apache.beam.sdk.io.TextIOReadTest$CompressedReadTest.testCompressedReadWithoutExtension' |
| // TODO(https://github.com/apache/beam/issues/21150) |
| excludeTestsMatching 'org.apache.beam.sdk.io.WriteFilesTest.testWithRunnerDeterminedShardingUnbounded' |
| // TODO(https://github.com/apache/beam/issues/211505) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testParDoWritingToUndeclaredTag' |
| // TODO(https://github.com/apache/beam/issues/21152) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testParDoReadingFromUnknownSideInput' |
| // TODO(https://github.com/apache/beam/issues/21153) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMapSideInputWithNullValuesCatchesDuplicates' |
| |
| // TODO(https://github.com/apache/beam/issues/21041) |
| excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingNPException' |
| excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingIOException' |
| excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingNPException' |
| excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingIOException' |
| // TODO(https://github.com/apache/beam/issues/21040) |
| excludeTestsMatching 'org.apache.beam.sdk.PipelineTest.testEmptyPipeline' |
| // TODO(https://github.com/apache/beam/issues/21038) |
| excludeTestsMatching 'org.apache.beam.sdk.io.AvroIOTest*' |
| // TODO(https://github.com/apache/beam/issues/21039) |
| excludeTestsMatching 'org.apache.beam.sdk.io.FileIOTest*' |
| // TODO(https://github.com/apache/beam/issues/21037) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsBackwardsInTimeShouldThrow' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsWithNullTimestampShouldThrow' |
| // TODO(https://github.com/apache/beam/issues/21035) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput' |
| // TODO(https://github.com/apache/beam/issues/21036) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.MapElementsTest.testMapSimpleFunction' |
| // TODO(https://github.com/apache/beam/issues/21033) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSizeFn' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithUnevenBatches' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSize' |
| // TODO(BEAM-10025) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded' |
| // TODO(https://github.com/apache/beam/issues/20703) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp' |
| // TODO(https://github.com/apache/beam/issues/20703) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testRelativeTimerWithOutputTimestamp' |
| // TODO(BEAM-13498) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew' |
| // TODO(https://github.com/apache/beam/issues/22650) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' |
| // TODO(https://github.com/apache/beam/issues/29973) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' |
| // TODO(https://github.com/apache/beam/issues/31231) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' |
| } |
| ) |
| } |
| |
| project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", true) |
| project.ext.validatesPortableRunnerEmbedded = portableValidatesRunnerTask("Embedded", false) |
| |
| tasks.register("validatesPortableRunner") { |
| dependsOn validatesPortableRunnerDocker |
| dependsOn validatesPortableRunnerEmbedded |
| } |
| |
| def testJavaVersion = project.findProperty('testJavaVersion') |
| String testJavaHome = null |
| if (testJavaVersion) { |
| testJavaHome = project.findProperty("java${testJavaVersion}Home") |
| } |
| |
| def jobPort = BeamModulePlugin.getRandomPort() |
| def artifactPort = BeamModulePlugin.getRandomPort() |
| |
| def setupTask = project.tasks.register("samzaJobServerSetup", Exec) { |
| dependsOn shadowJar |
| def pythonDir = project.project(":sdks:python").projectDir |
| def samzaJobServerJar = shadowJar.archivePath |
| if (testJavaHome) { |
| environment "JAVA_HOME", testJavaHome |
| } |
| executable 'sh' |
| args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${samzaJobServerJar}" |
| } |
| |
| def cleanupTask = project.tasks.register("samzaJobServerCleanup", Exec) { |
| def pythonDir = project.project(":sdks:python").projectDir |
| if (testJavaHome) { |
| environment "JAVA_HOME", testJavaHome |
| } |
| executable 'sh' |
| args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name}" |
| } |
| |
| createCrossLanguageValidatesRunnerTask( |
| startJobServer: setupTask, |
| cleanupJobServer: cleanupTask, |
| classpath: configurations.validatesPortableRunner, |
| numParallelTests: 1, |
| pythonPipelineOptions: [ |
| "--runner=PortableRunner", |
| "--job_endpoint=localhost:${jobPort}", |
| "--environment_cache_millis=10000", |
| "--experiments=beam_fn_api", |
| ], |
| javaPipelineOptions: [ |
| "--runner=PortableRunner", |
| "--jobEndpoint=localhost:${jobPort}", |
| "--environmentCacheMillis=10000", |
| "--experiments=beam_fn_api", |
| "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", |
| ], |
| goScriptOptions: [ |
| "--runner samza", |
| "--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"", |
| "--endpoint localhost:${jobPort}", |
| ], |
| ) |