blob: 091cfb053f2ebd29ecd936043783fff4dee3930c [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput
apply plugin: 'org.apache.beam.module'
applyJavaNature(
enableStrictDependencies: true,
automaticModuleName: 'org.apache.beam.runners.spark',
archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName),
exportJavadoc: (project.hasProperty('exportJavadoc') ? exportJavadoc : true),
classesTriggerCheckerBugs: [
'Aggregators': 'https://github.com/typetools/checker-framework/issues/6388#issuecomment-1885532351',
'SparkAssignWindowFn': 'https://github.com/typetools/checker-framework/issues/3793',
'SparkCombineFn' : 'https://github.com/typetools/checker-framework/issues/3793',
'WindowingHelpers' : 'https://github.com/typetools/checker-framework/issues/3793',
'WindowAssignTranslatorBatch': 'https://github.com/typetools/checker-framework/issues/6388#issuecomment-1885532351',
],
)
description = "Apache Beam :: Runners :: Spark $spark_version"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference the "sourceSets.test.output" directly.
*/
evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":sdks:java:io:hadoop-format")
evaluationDependsOn(":runners:core-java")
evaluationDependsOn(":examples:java")
configurations {
validatesRunner
examplesJavaIntegrationTest
}
def sparkTestProperties(overrides = [:]) {
def defaults = ["--runner": "TestSparkRunner"]
[
"log4j.configuration" : "log4j-test.properties",
"spark.sql.shuffle.partitions": "4",
"spark.ui.enabled" : "false",
"spark.ui.showConsoleProgress": "false",
"spark.kryo.registrationRequired": "true", // be strict in tests
"beamTestPipelineOptions" :
JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" })
]
}
def sparkTestJvmArgs() {
// run tests with Java 17 using -PtestJavaVersion=17 -Pjava17Home=???
if (project.hasProperty('testJavaVersion') &&
project.getProperty('testJavaVersion') in ['17', '21']) {
return [
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
// add-opens below required for Kryo FieldSerializer / SparkRunnerKryoRegistratorTest
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
]
} else {
return []
}
}
def hadoopVersions = [
"285" : "2.8.5",
"292" : "2.9.2",
"2102": "2.10.2",
"324" : "3.2.4",
]
hadoopVersions.each { kv -> configurations.create("hadoopVersion$kv.key") }
/*
* Per-version source overrides (mirrors runners/flink/flink_runner.gradle).
*
* Layout:
* runners/spark/src/ -- shared base (lowest supported version uses these directly)
* runners/spark/<major>/src/ -- version-specific overrides (later overrides take precedence)
*
* The lowest supported `spark_major` builds straight from the shared base.
* Higher versions copy <shared> + <previous majors> + <current> into a single
* source-overrides directory using DuplicatesStrategy.INCLUDE so the current
* version's files override earlier ones.
*/
def base_path = ".."
def overrides = { versions, type, group = 'java' ->
// order matters: later entries override earlier ones during the Copy
["${base_path}/src/${type}/${group}"] +
versions.collect { "${base_path}/${it}/src/${type}/${group}" } +
["./src/${type}/${group}"]
}
def all_versions = spark_versions.split(",").collect { it.trim() }
// Determine version order by list position rather than string comparison so two-digit
// majors (e.g. "10") still sort after single-digit ones.
def spark_major_index = all_versions.indexOf(spark_major)
if (spark_major_index < 0) {
throw new GradleException(
"spark_major='${spark_major}' is not listed in spark_versions='${spark_versions}' " +
"(see root gradle.properties).")
}
def previous_versions = spark_major_index > 0 ? all_versions.subList(0, spark_major_index) : []
def main_source_overrides = overrides(previous_versions, "main")
def test_source_overrides = overrides(previous_versions, "test")
def main_resources_overrides = overrides(previous_versions, "main", "resources")
def test_resources_overrides = overrides(previous_versions, "test", "resources")
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
copyTask.from main_source_overrides
copyTask.into "${sourceOverridesBase}/main/java"
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
project.ext.excluded_files.main.each { f -> copyTask.exclude "**/${f}" }
}
}
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
it.from main_resources_overrides
it.into "${sourceOverridesBase}/main/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask ->
copyTask.from test_source_overrides
copyTask.into "${sourceOverridesBase}/test/java"
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) {
project.ext.excluded_files.test.each { f -> copyTask.exclude "**/${f}" }
}
}
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
it.from test_resources_overrides
it.into "${sourceOverridesBase}/test/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
def use_override = (spark_major_index > 0)
def sourceBase = "${project.projectDir}/../src"
if (use_override) {
// Pin srcDirs to the Copy task providers so each higher version sees only its merged
// overrides tree. Passing the TaskProviders here lets Gradle auto-wire task dependencies
// for every consumer (compile, javadoc, sources jar, etc.) without manual dependsOn.
sourceSets {
main {
java { srcDirs = [copySourceOverrides] }
resources { srcDirs = [copyResourcesOverrides] }
}
test {
java { srcDirs = [copyTestSourceOverrides] }
resources { srcDirs = [copyTestResourcesOverrides] }
}
}
} else {
// Lowest supported Spark version: build straight from the shared base, no copy step.
sourceSets {
main {
java { srcDirs = ["${sourceBase}/main/java"] }
resources { srcDirs = ["${sourceBase}/main/resources"] }
}
test {
java { srcDirs = ["${sourceBase}/test/java"] }
resources { srcDirs = ["${sourceBase}/test/resources"] }
}
}
}
test {
systemProperties sparkTestProperties()
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
// Change log level to debug only for the package and nested packages:
// systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.spark.stateful", "debug"
jvmArgs "-XX:-UseGCOverheadLimit"
jvmArgs += sparkTestJvmArgs()
if (System.getProperty("beamSurefireArgline")) {
jvmArgs System.getProperty("beamSurefireArgline")
}
maxParallelForks 4
// easily re-run all tests (to deal with flaky tests / SparkContext leaks)
if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} }
}
class SparkComponents {
List<String> components
}
extensions.create('spark', SparkComponents)
spark.components = [
"org.apache.spark:spark-core_$spark_scala_version",
"org.apache.spark:spark-network-common_$spark_scala_version",
"org.apache.spark:spark-sql_$spark_scala_version",
"org.apache.spark:spark-streaming_$spark_scala_version",
"org.apache.spark:spark-catalyst_$spark_scala_version"
]
dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(":runners:java-job-service")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:avro")
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.commons_lang3
implementation library.java.args4j
implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation library.java.vendored_grpc_1_69_0
implementation library.java.vendored_guava_32_1_2_jre
spark.components.each { component ->
provided "$component:$spark_version"
}
if ("$spark_version" >= "3.5.0") {
implementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
implementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
}
permitUnusedDeclared "org.apache.spark:spark-network-common_$spark_scala_version:$spark_version"
implementation "io.dropwizard.metrics:metrics-core:4.1.1" // version used by Spark 3.1
compileOnly "org.scala-lang:scala-library:2.12.15"
runtimeOnly library.java.jackson_module_scala_2_12
// Force paranamer 2.8 to avoid issues when using Scala 2.12
runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
provided "org.apache.hadoop:hadoop-client-api:3.3.1"
provided library.java.commons_io
provided library.java.hamcrest
provided "com.esotericsoftware:kryo-shaded:4.0.2"
spark.components.each { component ->
testImplementation "$component:$spark_version"
}
testImplementation project(":sdks:java:io:kafka")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
testImplementation project(":sdks:java:harness")
testImplementation library.java.avro
testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
testImplementation library.java.kafka_clients
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation "org.assertj:assertj-core:3.11.1"
testImplementation "org.apache.zookeeper:zookeeper:3.4.11"
if ("$spark_version" >= "3.5.0") {
testImplementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
testImplementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
}
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesRunner project(":sdks:java:io:hadoop-format")
validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output
validatesRunner project(path: ":examples:java", configuration: "testRuntimeMigration")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
hadoopVersions.each { kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-common:$kv.value"
// Force paranamer 2.8 to avoid issues when using Scala 2.12
"hadoopVersion$kv.key" "com.thoughtworks.paranamer:paranamer:2.8"
if ("$spark_version" >= "3.5.0") {
// Add log4j 2.x dependencies as Spark 3.5+ uses slf4j with log4j 2.x backend
"hadoopVersion$kv.key" library.java.log4j2_api
"hadoopVersion$kv.key" library.java.log4j2_core
"hadoopVersion$kv.key" library.java.log4j2_slf4j2_impl
"hadoopVersion$kv.key" library.java.log4j2_log4j12_api
}
}
}
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def tempLocation = project.findProperty('tempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
configurations.all {
// Prevent StackOverflowError if slf4j-jdk14 is on the classpath
exclude group: "org.slf4j", module: "slf4j-jdk14"
// Avoid any transitive usage of the old codahale group to make dependency resolution deterministic
exclude group: "com.codahale.metrics", module: "metrics-core"
// Prevent conflict with kryo-shaded used by spark (from :sdks:java:core testRuntimeClasspath)
exclude group: "com.esotericsoftware.kryo", module: "kryo"
}
configurations.validatesRunner {
// Exclude to make sure log4j binding is used
exclude group: "org.slf4j", module: "slf4j-simple"
if ("$spark_version" >= "3.5.0") {
// Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+
exclude group: "log4j", module: "log4j"
}
}
hadoopVersions.each { kv ->
configurations."hadoopVersion$kv.key" {
resolutionStrategy {
force "org.apache.hadoop:hadoop-common:$kv.value"
}
if ("$spark_version" >= "3.5.0") {
// Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+
exclude group: "log4j", module: "log4j"
}
}
}
def applyBatchValidatesRunnerSetup = { Test it ->
group = "Verification"
// Disable gradle cache
it.outputs.upToDateWhen { false }
it.jvmArgs += sparkTestJvmArgs()
it.jvmArgs '-Xmx3g'
it.classpath = project.configurations.validatesRunner
it.testClassesDirs = project.files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
it.maxParallelForks 4
it.useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
// Unbounded
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
// Ordering
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
}
}
def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])
// TODO(https://github.com/apache/beam/issues/31231
it.filter {
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}
def validatesRunnerBatchWithBoundedSDFExperiment = tasks.register("validatesRunnerBatchWithBoundedSDFExperiment", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false", "--experiments":"use_bounded_concurrent_output_for_sdf"])
// experiment concerns only SDF implementation, therefore avoid needlessly rerunning other tests
it.filter {
includeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest'
}
}
def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"])
jvmArgs += sparkTestJvmArgs()
classpath = configurations.validatesRunner
testClassesDirs += files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
filter {
// UNBOUNDED View.CreatePCollectionView not supported
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle'
// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
// TestStream using processing time is not supported in Spark
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
// Exceeds Java heap space
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
// Ordering
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
}
}
tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"])
// Register various other classes used in tests
systemProperty 'spark.kryo.classesToRegister',
'org.apache.beam.sdk.transforms.MapViewTest$NonDeterministicStringCoder,' +
'org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.RegularImmutableList'
jvmArgs += sparkTestJvmArgs()
jvmArgs '-Xmx7g' // Increase memory heap in order to avoid OOM errors
classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
// Unbounded
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// multiple coders bug BEAM-8894
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
}
filter {
// Combine with context not implemented
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContextEmpty'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSlidingWindowsCombineWithContext'
// multiple coders bug BEAM-8894
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders'
// SDF
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded'
// https://github.com/apache/beam/issues/29972
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testHotKeyCombineWithSideInputs'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}
tasks.register("validatesRunner") {
group = "Verification"
description "Validates Spark runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithBoundedSDFExperiment
dependsOn validatesRunnerStreaming
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// Otherwise, it breaks Spark runner ValidatesRunner tests.
//dependsOn validatesStructuredStreamingRunnerBatch
}
tasks.register("hadoopVersionsTest") {
group = "Verification"
dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"}
}
tasks.register("examplesIntegrationTest", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
systemProperties sparkTestProperties([
"--tempLocation": "${tempLocation}",
"--tempRoot" : "${tempLocation}",
"--project" : "${gcpProject}"
])
jvmArgs += sparkTestJvmArgs()
jvmArgs '-Xmx3g'
include '**/*IT.class'
maxParallelForks 4
classpath = configurations.validatesRunner
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
useJUnit {
filter {
// TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with SparkRunner: Failed to read from sharded output
excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInStreamingStaticSharding'
excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
}
}
}
hadoopVersions.each { kv ->
tasks.register("hadoopVersion${kv.key}Test", Test) {
group = "Verification"
description = "Runs Spark tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
systemProperties sparkTestProperties()
jvmArgs += sparkTestJvmArgs()
include "**/*Test.class"
maxParallelForks 4
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
}
}
}