| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import groovy.json.JsonOutput |
| |
| apply plugin: 'org.apache.beam.module' |
| |
| // Numeric version comparison (lexicographic string compare was fragile — e.g. "3.10.0" < "3.5.0"). |
| def isSparkAtLeast = { String minVersion -> |
| def parts = spark_version.tokenize('.-').findAll { it.isInteger() }*.toInteger() |
| def minParts = minVersion.tokenize('.-').findAll { it.isInteger() }*.toInteger() |
| for (int i = 0; i < Math.min(parts.size(), minParts.size()); i++) { |
| if (parts[i] != minParts[i]) return parts[i] > minParts[i] |
| } |
| return parts.size() >= minParts.size() |
| } |
| |
| applyJavaNature( |
| enableStrictDependencies: true, |
| requireJavaVersion: (isSparkAtLeast("4.0.0") ? org.gradle.api.JavaVersion.VERSION_17 : null), |
| automaticModuleName: 'org.apache.beam.runners.spark', |
| archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName), |
| exportJavadoc: (project.hasProperty('exportJavadoc') ? exportJavadoc : true), |
| classesTriggerCheckerBugs: [ |
| 'Aggregators': 'https://github.com/typetools/checker-framework/issues/6388#issuecomment-1885532351', |
| 'SparkAssignWindowFn': 'https://github.com/typetools/checker-framework/issues/3793', |
| 'SparkCombineFn' : 'https://github.com/typetools/checker-framework/issues/3793', |
| 'WindowingHelpers' : 'https://github.com/typetools/checker-framework/issues/3793', |
| 'WindowAssignTranslatorBatch': 'https://github.com/typetools/checker-framework/issues/6388#issuecomment-1885532351', |
| ], |
| ) |
| |
| description = "Apache Beam :: Runners :: Spark $spark_version" |
| |
| /* |
| * We need to rely on manually specifying these evaluationDependsOn to ensure that |
| * the following projects are evaluated before we evaluate this project. This is because |
| * we are attempting to reference the "sourceSets.test.output" directly. |
| */ |
| evaluationDependsOn(":sdks:java:core") |
| evaluationDependsOn(":sdks:java:io:hadoop-format") |
| evaluationDependsOn(":runners:core-java") |
| evaluationDependsOn(":examples:java") |
| |
| configurations { |
| validatesRunner |
| examplesJavaIntegrationTest |
| } |
| |
| def sparkTestProperties(overrides = [:]) { |
| def defaults = ["--runner": "TestSparkRunner"] |
| [ |
| "log4j.configuration" : "log4j-test.properties", |
| "spark.sql.shuffle.partitions": "4", |
| "spark.ui.enabled" : "false", |
| "spark.ui.showConsoleProgress": "false", |
| "spark.kryo.registrationRequired": "true", // be strict in tests |
| "beamTestPipelineOptions" : |
| JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" }) |
| ] |
| } |
| |
| |
| def sparkTestJvmArgs() { |
| // run tests with Java 17 using -PtestJavaVersion=17 -Pjava17Home=??? |
| if (project.hasProperty('testJavaVersion') && |
| project.getProperty('testJavaVersion') in ['17', '21']) { |
| return [ |
| "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", |
| // add-opens below required for Kryo FieldSerializer / SparkRunnerKryoRegistratorTest |
| "--add-opens=java.base/java.nio=ALL-UNNAMED", |
| "--add-opens=java.base/java.util=ALL-UNNAMED", |
| "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" |
| ] |
| } else { |
| return [] |
| } |
| } |
| |
| def hadoopVersions = [ |
| "285" : "2.8.5", |
| "292" : "2.9.2", |
| "2102": "2.10.2", |
| "324" : "3.2.4", |
| ] |
| |
| hadoopVersions.each { kv -> configurations.create("hadoopVersion$kv.key") } |
| |
| /* |
| * Per-version source overrides (mirrors runners/flink/flink_runner.gradle). |
| * |
| * Layout: |
| * runners/spark/src/ -- shared base (lowest supported version uses these directly) |
| * runners/spark/<major>/src/ -- version-specific overrides (later overrides take precedence) |
| * |
| * The lowest supported `spark_major` builds straight from the shared base. |
| * Higher versions copy <shared> + <previous majors> + <current> into a single |
| * source-overrides directory using DuplicatesStrategy.INCLUDE so the current |
| * version's files override earlier ones. |
| */ |
| def base_path = ".." |
| |
| def overrides = { versions, type, group = 'java' -> |
| // order matters: later entries override earlier ones during the Copy |
| ["${base_path}/src/${type}/${group}"] + |
| versions.collect { "${base_path}/${it}/src/${type}/${group}" } + |
| ["./src/${type}/${group}"] |
| } |
| |
| def all_versions = spark_versions.split(",").collect { it.trim() } |
| // Determine version order by list position rather than string comparison so two-digit |
| // majors (e.g. "10") still sort after single-digit ones. |
| def spark_major_index = all_versions.indexOf(spark_major) |
| if (spark_major_index < 0) { |
| throw new GradleException( |
| "spark_major='${spark_major}' is not listed in spark_versions='${spark_versions}' " + |
| "(see root gradle.properties).") |
| } |
| def previous_versions = spark_major_index > 0 ? all_versions.subList(0, spark_major_index) : [] |
| |
| def main_source_overrides = overrides(previous_versions, "main") |
| def test_source_overrides = overrides(previous_versions, "test") |
| def main_resources_overrides = overrides(previous_versions, "main", "resources") |
| def test_resources_overrides = overrides(previous_versions, "test", "resources") |
| |
| def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get() |
| |
| def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask -> |
| copyTask.from main_source_overrides |
| copyTask.into "${sourceOverridesBase}/main/java" |
| copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) { |
| project.ext.excluded_files.main.each { f -> copyTask.exclude "**/${f}" } |
| } |
| } |
| |
| def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { |
| it.from main_resources_overrides |
| it.into "${sourceOverridesBase}/main/resources" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| |
| def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask -> |
| copyTask.from test_source_overrides |
| copyTask.into "${sourceOverridesBase}/test/java" |
| copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) { |
| project.ext.excluded_files.test.each { f -> copyTask.exclude "**/${f}" } |
| } |
| } |
| |
| def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) { |
| it.from test_resources_overrides |
| it.into "${sourceOverridesBase}/test/resources" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| |
| def use_override = (spark_major_index > 0) |
| def sourceBase = "${project.projectDir}/../src" |
| |
| if (use_override) { |
| // Pin srcDirs to the Copy task providers so each higher version sees only its merged |
| // overrides tree. Passing the TaskProviders here lets Gradle auto-wire task dependencies |
| // for every consumer (compile, javadoc, sources jar, etc.) without manual dependsOn. |
| sourceSets { |
| main { |
| java { srcDirs = [copySourceOverrides] } |
| resources { srcDirs = [copyResourcesOverrides] } |
| } |
| test { |
| java { srcDirs = [copyTestSourceOverrides] } |
| resources { srcDirs = [copyTestResourcesOverrides] } |
| } |
| } |
| } else { |
| // Lowest supported Spark version: build straight from the shared base, no copy step. |
| sourceSets { |
| main { |
| java { srcDirs = ["${sourceBase}/main/java"] } |
| resources { srcDirs = ["${sourceBase}/main/resources"] } |
| } |
| test { |
| java { srcDirs = ["${sourceBase}/test/java"] } |
| resources { srcDirs = ["${sourceBase}/test/resources"] } |
| } |
| } |
| } |
| |
| test { |
| systemProperties sparkTestProperties() |
| // Change log level to debug: |
| // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" |
| // Change log level to debug only for the package and nested packages: |
| // systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.spark.stateful", "debug" |
| jvmArgs "-XX:-UseGCOverheadLimit" |
| jvmArgs += sparkTestJvmArgs() |
| if (System.getProperty("beamSurefireArgline")) { |
| jvmArgs System.getProperty("beamSurefireArgline") |
| } |
| |
| maxParallelForks 4 |
| |
| // easily re-run all tests (to deal with flaky tests / SparkContext leaks) |
| if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} } |
| } |
| |
| class SparkComponents { |
| List<String> components |
| } |
| |
| extensions.create('spark', SparkComponents) |
| spark.components = [ |
| "org.apache.spark:spark-core_$spark_scala_version", |
| "org.apache.spark:spark-network-common_$spark_scala_version", |
| "org.apache.spark:spark-sql_$spark_scala_version", |
| "org.apache.spark:spark-streaming_$spark_scala_version", |
| "org.apache.spark:spark-catalyst_$spark_scala_version" |
| ] |
| |
| dependencies { |
| implementation project(path: ":model:pipeline", configuration: "shadow") |
| implementation project(path: ":sdks:java:core", configuration: "shadow") |
| implementation project(":runners:core-java") |
| implementation project(":runners:java-fn-execution") |
| implementation project(":runners:java-job-service") |
| implementation project(":sdks:java:extensions:google-cloud-platform-core") |
| implementation project(":sdks:java:extensions:avro") |
| implementation library.java.jackson_annotations |
| implementation library.java.slf4j_api |
| implementation library.java.joda_time |
| implementation library.java.opentelemetry_context |
| implementation library.java.commons_lang3 |
| implementation library.java.args4j |
| implementation project(path: ":model:fn-execution", configuration: "shadow") |
| implementation project(path: ":model:job-management", configuration: "shadow") |
| implementation library.java.vendored_grpc_1_69_0 |
| implementation library.java.vendored_guava_32_1_2_jre |
| spark.components.each { component -> |
| provided "$component:$spark_version" |
| } |
| if (isSparkAtLeast("3.5.0")) { |
| implementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version" |
| implementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version" |
| } |
| if (isSparkAtLeast("4.0.0")) { |
| // Spark 4 splits the Connect shims out of spark-sql; classes are referenced directly |
| // (e.g. via SparkSession builder paths) so strict dependency analysis requires it as |
| // a declared provided dep. The artifact does not exist for Spark 3. |
| provided "org.apache.spark:spark-connect-shims_$spark_scala_version:$spark_version" |
| } |
| permitUnusedDeclared "org.apache.spark:spark-network-common_$spark_scala_version:$spark_version" |
| implementation "io.dropwizard.metrics:metrics-core:4.1.1" // version used by Spark 3.1 |
| if (spark_scala_version == '2.13') { |
| compileOnly "org.scala-lang:scala-library:2.13.15" |
| runtimeOnly library.java.jackson_module_scala_2_13 |
| } else { |
| compileOnly "org.scala-lang:scala-library:2.12.15" |
| runtimeOnly library.java.jackson_module_scala_2_12 |
| // Force paranamer 2.8 to avoid issues when using Scala 2.12 |
| runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8" |
| } |
| provided "org.apache.hadoop:hadoop-client-api:3.3.1" |
| provided library.java.commons_io |
| provided library.java.hamcrest |
| provided "com.esotericsoftware:kryo-shaded:4.0.2" |
| spark.components.each { component -> |
| testImplementation "$component:$spark_version" |
| } |
| testImplementation project(":sdks:java:io:kafka") |
| testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") |
| // SparkStateInternalsTest extends abstract StateInternalsTest |
| testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration") |
| testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") |
| testImplementation project(":sdks:java:harness") |
| testImplementation library.java.avro |
| // kafka_2.13 artifacts were first published in 2.5.0; use a later version for Scala 2.13 |
| def kafka_version = (spark_scala_version == '2.13') ? '2.8.0' : '2.4.1' |
| testImplementation "org.apache.kafka:kafka_$spark_scala_version:$kafka_version" |
| testImplementation library.java.kafka_clients |
| testImplementation library.java.junit |
| testImplementation library.java.mockito_core |
| testImplementation "org.assertj:assertj-core:3.11.1" |
| testImplementation "org.apache.zookeeper:zookeeper:3.4.11" |
| if (isSparkAtLeast("3.5.0")) { |
| testImplementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version" |
| testImplementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version" |
| } |
| validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") |
| validatesRunner project(":sdks:java:io:hadoop-format") |
| validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output |
| validatesRunner project(path: ":examples:java", configuration: "testRuntimeMigration") |
| validatesRunner project(path: project.path, configuration: "testRuntimeMigration") |
| hadoopVersions.each { kv -> |
| "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-common:$kv.value" |
| // Force paranamer 2.8 to avoid issues when using Scala 2.12 |
| "hadoopVersion$kv.key" "com.thoughtworks.paranamer:paranamer:2.8" |
| if (isSparkAtLeast("3.5.0")) { |
| // Add log4j 2.x dependencies as Spark 3.5+ uses slf4j with log4j 2.x backend |
| "hadoopVersion$kv.key" library.java.log4j2_api |
| "hadoopVersion$kv.key" library.java.log4j2_core |
| "hadoopVersion$kv.key" library.java.log4j2_slf4j2_impl |
| "hadoopVersion$kv.key" library.java.log4j2_log4j12_api |
| } |
| } |
| } |
| |
| def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' |
| def tempLocation = project.findProperty('tempLocation') ?: 'gs://temp-storage-for-end-to-end-tests' |
| |
| configurations.all { |
| // Prevent StackOverflowError if slf4j-jdk14 is on the classpath |
| exclude group: "org.slf4j", module: "slf4j-jdk14" |
| // Avoid any transitive usage of the old codahale group to make dependency resolution deterministic |
| exclude group: "com.codahale.metrics", module: "metrics-core" |
| // Prevent conflict with kryo-shaded used by spark (from :sdks:java:core testRuntimeClasspath) |
| exclude group: "com.esotericsoftware.kryo", module: "kryo" |
| } |
| |
| configurations.validatesRunner { |
| // Exclude to make sure log4j binding is used |
| exclude group: "org.slf4j", module: "slf4j-simple" |
| |
| if (isSparkAtLeast("3.5.0")) { |
| // Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+ |
| exclude group: "log4j", module: "log4j" |
| } |
| } |
| |
| hadoopVersions.each { kv -> |
| configurations."hadoopVersion$kv.key" { |
| resolutionStrategy { |
| force "org.apache.hadoop:hadoop-common:$kv.value" |
| } |
| if (isSparkAtLeast("3.5.0")) { |
| // Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+ |
| exclude group: "log4j", module: "log4j" |
| } |
| } |
| } |
| |
| def applyBatchValidatesRunnerSetup = { Test it -> |
| group = "Verification" |
| // Disable gradle cache |
| it.outputs.upToDateWhen { false } |
| it.jvmArgs += sparkTestJvmArgs() |
| it.jvmArgs '-Xmx3g' |
| |
| it.classpath = project.configurations.validatesRunner |
| it.testClassesDirs = project.files( |
| project(":sdks:java:core").sourceSets.test.output.classesDirs, |
| project(":runners:core-java").sourceSets.test.output.classesDirs, |
| ) |
| |
| it.maxParallelForks 4 |
| |
| it.useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' |
| // Unbounded |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| // Metrics |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' |
| // SDF |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| // Portability |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| // Ordering |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' |
| } |
| } |
| |
| def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { |
| applyBatchValidatesRunnerSetup(it) |
| systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"]) |
| |
| // TODO(https://github.com/apache/beam/issues/31231 |
| it.filter { |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' |
| // TODO(https://github.com/apache/beam/issues/32021) |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' |
| // Spark does not support side inputs in timers (added in #38363) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testSideInputNotReadyTimer' |
| } |
| } |
| |
| def validatesRunnerBatchWithBoundedSDFExperiment = tasks.register("validatesRunnerBatchWithBoundedSDFExperiment", Test) { |
| applyBatchValidatesRunnerSetup(it) |
| systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false", "--experiments":"use_bounded_concurrent_output_for_sdf"]) |
| |
| // experiment concerns only SDF implementation, therefore avoid needlessly rerunning other tests |
| it.filter { |
| includeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest' |
| } |
| } |
| |
| |
| def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"]) |
| jvmArgs += sparkTestJvmArgs() |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs += files( |
| project(":sdks:java:core").sourceSets.test.output.classesDirs, |
| project(":runners:core-java").sourceSets.test.output.classesDirs, |
| ) |
| |
| maxParallelForks 4 |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| |
| filter { |
| // UNBOUNDED View.CreatePCollectionView not supported |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle' |
| // TODO(https://github.com/apache/beam/issues/29973) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' |
| // TODO(https://github.com/apache/beam/issues/31231 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' |
| // TODO(https://github.com/apache/beam/issues/32021) |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' |
| } |
| |
| // TestStream using processing time is not supported in Spark |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' |
| |
| // Exceeds Java heap space |
| excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' |
| |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| |
| // State and Timers |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| |
| // Metrics |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' |
| // SDF |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' |
| // Portability |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| // Ordering |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' |
| } |
| } |
| |
| tasks.register("validatesStructuredStreamingRunnerBatch", Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"]) |
| // Register various other classes used in tests |
| systemProperty 'spark.kryo.classesToRegister', |
| 'org.apache.beam.sdk.transforms.MapViewTest$NonDeterministicStringCoder,' + |
| 'org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.RegularImmutableList' |
| jvmArgs += sparkTestJvmArgs() |
| jvmArgs '-Xmx7g' // Increase memory heap in order to avoid OOM errors |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs = files( |
| project(":sdks:java:core").sourceSets.test.output.classesDirs, |
| project(":runners:core-java").sourceSets.test.output.classesDirs, |
| ) |
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) |
| |
| maxParallelForks 4 |
| |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Should be run only in a properly configured SDK harness environment |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' |
| // Unbounded |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| // State and Timers |
| excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| // Metrics |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' |
| // multiple coders bug BEAM-8894 |
| excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' |
| // SDF |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| // Portability |
| excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' |
| } |
| filter { |
| // Combine with context not implemented |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContextEmpty' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSlidingWindowsCombineWithContext' |
| // multiple coders bug BEAM-8894 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders' |
| // SDF |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded' |
| // https://github.com/apache/beam/issues/29972 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testHotKeyCombineWithSideInputs' |
| // TODO(https://github.com/apache/beam/issues/32021) |
| excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' |
| } |
| } |
| |
| tasks.register("validatesRunner") { |
| group = "Verification" |
| description "Validates Spark runner" |
| dependsOn validatesRunnerBatch |
| dependsOn validatesRunnerBatchWithBoundedSDFExperiment |
| dependsOn validatesRunnerStreaming |
| // It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass. |
| // Otherwise, it breaks Spark runner ValidatesRunner tests. |
| //dependsOn validatesStructuredStreamingRunnerBatch |
| } |
| |
| tasks.register("hadoopVersionsTest") { |
| group = "Verification" |
| dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"} |
| } |
| |
| tasks.register("examplesIntegrationTest", Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| systemProperties sparkTestProperties([ |
| "--tempLocation": "${tempLocation}", |
| "--tempRoot" : "${tempLocation}", |
| "--project" : "${gcpProject}" |
| ]) |
| jvmArgs += sparkTestJvmArgs() |
| jvmArgs '-Xmx3g' |
| |
| include '**/*IT.class' |
| maxParallelForks 4 |
| classpath = configurations.validatesRunner |
| testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs) |
| useJUnit { |
| filter { |
| // TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with SparkRunner: Failed to read from sharded output |
| excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInStreamingStaticSharding' |
| excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding' |
| } |
| } |
| } |
| |
| hadoopVersions.each { kv -> |
| tasks.register("hadoopVersion${kv.key}Test", Test) { |
| group = "Verification" |
| description = "Runs Spark tests with Hadoop version $kv.value" |
| classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath |
| systemProperties sparkTestProperties() |
| jvmArgs += sparkTestJvmArgs() |
| |
| include "**/*Test.class" |
| maxParallelForks 4 |
| useJUnit { |
| excludeCategories "org.apache.beam.runners.spark.StreamingTest" |
| excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" |
| } |
| } |
| } |