blob: 5b49e14b41067a84e8d55f6d459ed848cf24b018 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonOutput
import java.util.stream.Collectors
apply plugin: 'org.apache.beam.module'
applyJavaNature(
enableStrictDependencies:true,
automaticModuleName: 'org.apache.beam.runners.spark',
archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName),
classesTriggerCheckerBugs: [
'SparkAssignWindowFn': 'https://github.com/typetools/checker-framework/issues/3793',
'SparkCombineFn': 'https://github.com/typetools/checker-framework/issues/3793',
'WindowingHelpers': 'https://github.com/typetools/checker-framework/issues/3793',
],
)
description = "Apache Beam :: Runners :: Spark $spark_version"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference the "sourceSets.test.output" directly.
*/
evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":sdks:java:io:hadoop-format")
evaluationDependsOn(":runners:core-java")
configurations {
validatesRunner
}
def hadoopVersions = [
"285": "2.8.5",
"292": "2.9.2",
"2101": "2.10.1",
"321": "3.2.1",
]
hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}
/*
* Copy & merge source overrides into build directory.
*/
def sourceOverridesBase = "${project.buildDir}/source-overrides/src"
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
it.from main_source_overrides
it.into "${sourceOverridesBase}/main/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
compileJava.dependsOn copySourceOverrides
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
it.from main_resources_overrides
it.into "${sourceOverridesBase}/main/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
compileJava.dependsOn copyResourcesOverrides
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
it.from test_source_overrides
it.into "${sourceOverridesBase}/test/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
compileTestJava.dependsOn copyTestSourceOverrides
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
it.from test_resources_overrides
it.into "${sourceOverridesBase}/test/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
compileJava.dependsOn copyTestResourcesOverrides
/*
* We have to explicitly set all directories here to make sure each
* version of Spark has the correct overrides set.
*/
def sourceBase = "${project.projectDir}/../src"
sourceSets {
main {
java {
srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
}
resources {
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
}
}
test {
java {
srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
}
resources {
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
}
}
}
test {
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
systemProperty "log4j.configuration", "log4j-test.properties"
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
// Change log level to debug only for the package and nested packages:
// systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.spark.stateful", "debug"
jvmArgs "-XX:-UseGCOverheadLimit"
if (System.getProperty("beamSurefireArgline")) {
jvmArgs System.getProperty("beamSurefireArgline")
}
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
}
filter {
// BEAM-11653 MetricsSinkTest is failing with Spark 3
excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest'
}
}
dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":runners:core-construction-java")
compile project(":runners:core-java")
compile project(":runners:java-fn-execution")
compile project(":runners:java-job-service")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile library.java.jackson_annotations
compile library.java.slf4j_api
compile library.java.joda_time
compile library.java.args4j
compile project(path: ":model:fn-execution", configuration: "shadow")
compile project(path: ":model:job-management", configuration: "shadow")
compile project(":sdks:java:fn-execution")
compile library.java.vendored_grpc_1_36_0
compile library.java.vendored_guava_26_0_jre
provided "org.apache.spark:spark-core_$spark_scala_version:$spark_version"
provided "org.apache.spark:spark-network-common_$spark_scala_version:$spark_version"
provided "org.apache.spark:spark-sql_$spark_scala_version:$spark_version"
provided "org.apache.spark:spark-streaming_$spark_scala_version:$spark_version"
if(project.property("spark_scala_version").equals("2.11")){
runtimeOnly library.java.jackson_module_scala_2_11
} else {
runtimeOnly library.java.jackson_module_scala_2_12
}
// Force paranamer 2.8 to avoid issues when using Scala 2.12
runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
provided library.java.hadoop_common
provided library.java.commons_io
provided library.java.hamcrest_core
provided library.java.hamcrest_library
provided "com.esotericsoftware.kryo:kryo:2.21"
testCompile project(":sdks:java:io:kafka")
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
testCompile project(path: ":runners:core-java", configuration: "testRuntime")
testCompile project(":sdks:java:harness")
testCompile library.java.avro
testCompile "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
testCompile library.java.kafka_clients
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile "org.apache.zookeeper:zookeeper:3.4.11"
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(":sdks:java:io:hadoop-format")
validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output
validatesRunner project(path: ":examples:java", configuration: "testRuntime")
validatesRunner project(path: project.path, configuration: "testRuntime")
validatesRunner project(project.path)
validatesRunner project(path: project.path, configuration: "provided")
hadoopVersions.each {kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-common:$kv.value"
}
}
configurations.testRuntimeClasspath {
// Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
configurations.validatesRunner {
// Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
hadoopVersions.each {kv ->
configurations."hadoopVersion$kv.key" {
// Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
exclude group: "org.slf4j", module: "slf4j-jdk14"
resolutionStrategy {
force "org.apache.hadoop:hadoop-common:$kv.value"
}
}
}
task validatesRunnerBatch(type: Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
// Unbounded
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
jvmArgs '-Xmx3g'
}
task validatesRunnerStreaming(type: Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--forceStreaming=true",
"--enableSparkMetricSinks=true",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.runners.spark.StreamingTest'
}
filter {
// BEAM-11653 MetricsSinkTest is failing with Spark 3
excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest'
}
}
task validatesStructuredStreamingRunnerBatch(type: Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=SparkStructuredStreamingRunner",
"--testMode=true",
"--streaming=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
// Increase memory heap in order to avoid OOM errors
jvmArgs '-Xmx4g'
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Unbounded
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// multiple coders bug BEAM-8894
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
filter {
// Combine with context not implemented
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContextEmpty'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSlidingWindowsCombineWithContext'
// multiple coders bug BEAM-8894
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders'
// SDF
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded'
}
}
task validatesRunner {
group = "Verification"
description "Validates Spark runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerStreaming
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// Otherwise, it breaks Spark runner ValidatesRunner tests.
//dependsOn validatesStructuredStreamingRunnerBatch
}
// Generates :runners:spark:*:runQuickstartJavaSpark task
createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark')
task hadoopVersionsTest(group: "Verification") {
def taskNames = hadoopVersions.keySet().stream()
.map{num -> "hadoopVersion${num}Test"}
.collect(Collectors.toList())
dependsOn taskNames
}
hadoopVersions.each {kv ->
task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") {
description = "Runs Spark tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
include "**/*Test.class"
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
}
}
}