| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import groovy.json.JsonOutput |
| import java.util.stream.Collectors |
| |
| apply plugin: 'org.apache.beam.module' |
| applyJavaNature( |
| enableStrictDependencies:true, |
| automaticModuleName: 'org.apache.beam.runners.spark', |
| archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName), |
| classesTriggerCheckerBugs: [ |
| 'SparkAssignWindowFn': 'https://github.com/typetools/checker-framework/issues/3793', |
| 'SparkCombineFn': 'https://github.com/typetools/checker-framework/issues/3793', |
| 'WindowingHelpers': 'https://github.com/typetools/checker-framework/issues/3793', |
| ], |
| ) |
| |
| description = "Apache Beam :: Runners :: Spark $spark_version" |
| |
| /* |
| * We need to rely on manually specifying these evaluationDependsOn to ensure that |
| * the following projects are evaluated before we evaluate this project. This is because |
| * we are attempting to reference the "sourceSets.test.output" directly. |
| */ |
| evaluationDependsOn(":sdks:java:core") |
| evaluationDependsOn(":sdks:java:io:hadoop-format") |
| evaluationDependsOn(":runners:core-java") |
| |
| configurations { |
| validatesRunner |
| } |
| |
| def hadoopVersions = [ |
| "285": "2.8.5", |
| "292": "2.9.2", |
| "2101": "2.10.1", |
| "321": "3.2.1", |
| ] |
| |
| hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} |
| |
| /* |
| * Copy & merge source overrides into build directory. |
| */ |
| def sourceOverridesBase = "${project.buildDir}/source-overrides/src" |
| |
| def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { |
| it.from main_source_overrides |
| it.into "${sourceOverridesBase}/main/java" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| compileJava.dependsOn copySourceOverrides |
| |
| def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { |
| it.from main_resources_overrides |
| it.into "${sourceOverridesBase}/main/resources" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| compileJava.dependsOn copyResourcesOverrides |
| |
| def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { |
| it.from test_source_overrides |
| it.into "${sourceOverridesBase}/test/java" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| compileTestJava.dependsOn copyTestSourceOverrides |
| |
| def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) { |
| it.from test_resources_overrides |
| it.into "${sourceOverridesBase}/test/resources" |
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE |
| } |
| compileJava.dependsOn copyTestResourcesOverrides |
| |
| /* |
| * We have to explicitly set all directories here to make sure each |
| * version of Spark has the correct overrides set. |
| */ |
| def sourceBase = "${project.projectDir}/../src" |
| sourceSets { |
| main { |
| java { |
| srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"] |
| } |
| resources { |
| srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"] |
| } |
| } |
| test { |
| java { |
| srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"] |
| } |
| resources { |
| srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"] |
| } |
| } |
| } |
| |
| test { |
| systemProperty "beam.spark.test.reuseSparkContext", "true" |
| systemProperty "spark.sql.shuffle.partitions", "4" |
| systemProperty "spark.ui.enabled", "false" |
| systemProperty "spark.ui.showConsoleProgress", "false" |
| systemProperty "beamTestPipelineOptions", """[ |
| "--runner=TestSparkRunner", |
| "--streaming=false", |
| "--enableSparkMetricSinks=true" |
| ]""" |
| systemProperty "log4j.configuration", "log4j-test.properties" |
| // Change log level to debug: |
| // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" |
| // Change log level to debug only for the package and nested packages: |
| // systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.spark.stateful", "debug" |
| jvmArgs "-XX:-UseGCOverheadLimit" |
| if (System.getProperty("beamSurefireArgline")) { |
| jvmArgs System.getProperty("beamSurefireArgline") |
| } |
| |
| // Only one SparkContext may be running in a JVM (SPARK-2243) |
| forkEvery 1 |
| maxParallelForks 4 |
| useJUnit { |
| excludeCategories "org.apache.beam.runners.spark.StreamingTest" |
| excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" |
| } |
| filter { |
| // BEAM-11653 MetricsSinkTest is failing with Spark 3 |
| excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' |
| } |
| } |
| |
| dependencies { |
| compile project(path: ":model:pipeline", configuration: "shadow") |
| compile project(path: ":sdks:java:core", configuration: "shadow") |
| compile project(":runners:core-construction-java") |
| compile project(":runners:core-java") |
| compile project(":runners:java-fn-execution") |
| compile project(":runners:java-job-service") |
| compile project(":sdks:java:extensions:google-cloud-platform-core") |
| compile library.java.jackson_annotations |
| compile library.java.slf4j_api |
| compile library.java.joda_time |
| compile library.java.args4j |
| compile project(path: ":model:fn-execution", configuration: "shadow") |
| compile project(path: ":model:job-management", configuration: "shadow") |
| compile project(":sdks:java:fn-execution") |
| compile library.java.vendored_grpc_1_36_0 |
| compile library.java.vendored_guava_26_0_jre |
| provided "org.apache.spark:spark-core_$spark_scala_version:$spark_version" |
| provided "org.apache.spark:spark-network-common_$spark_scala_version:$spark_version" |
| provided "org.apache.spark:spark-sql_$spark_scala_version:$spark_version" |
| provided "org.apache.spark:spark-streaming_$spark_scala_version:$spark_version" |
| if(project.property("spark_scala_version").equals("2.11")){ |
| runtimeOnly library.java.jackson_module_scala_2_11 |
| } else { |
| runtimeOnly library.java.jackson_module_scala_2_12 |
| } |
| // Force paranamer 2.8 to avoid issues when using Scala 2.12 |
| runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8" |
| provided library.java.hadoop_common |
| provided library.java.commons_io |
| provided library.java.hamcrest_core |
| provided library.java.hamcrest_library |
| provided "com.esotericsoftware.kryo:kryo:2.21" |
| testCompile project(":sdks:java:io:kafka") |
| testCompile project(path: ":sdks:java:core", configuration: "shadowTest") |
| // SparkStateInternalsTest extends abstract StateInternalsTest |
| testCompile project(path: ":runners:core-java", configuration: "testRuntime") |
| testCompile project(":sdks:java:harness") |
| testCompile library.java.avro |
| testCompile "org.apache.kafka:kafka_$spark_scala_version:2.4.1" |
| testCompile library.java.kafka_clients |
| testCompile library.java.junit |
| testCompile library.java.mockito_core |
| testCompile "org.apache.zookeeper:zookeeper:3.4.11" |
| validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") |
| validatesRunner project(":sdks:java:io:hadoop-format") |
| validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output |
| validatesRunner project(path: ":examples:java", configuration: "testRuntime") |
| validatesRunner project(path: project.path, configuration: "testRuntime") |
| validatesRunner project(project.path) |
| validatesRunner project(path: project.path, configuration: "provided") |
| hadoopVersions.each {kv -> |
| "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-common:$kv.value" |
| } |
| } |
| |
| configurations.testRuntimeClasspath { |
| // Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath |
| exclude group: "org.slf4j", module: "slf4j-jdk14" |
| } |
| |
| configurations.validatesRunner { |
| // Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath |
| exclude group: "org.slf4j", module: "slf4j-jdk14" |
| } |
| |
| |
| hadoopVersions.each {kv -> |
| configurations."hadoopVersion$kv.key" { |
| // Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath |
| exclude group: "org.slf4j", module: "slf4j-jdk14" |
| resolutionStrategy { |
| force "org.apache.hadoop:hadoop-common:$kv.value" |
| } |
| } |
| } |
| |
| task validatesRunnerBatch(type: Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| def pipelineOptions = JsonOutput.toJson([ |
| "--runner=TestSparkRunner", |
| "--streaming=false", |
| "--enableSparkMetricSinks=false", |
| ]) |
| systemProperty "beamTestPipelineOptions", pipelineOptions |
| systemProperty "beam.spark.test.reuseSparkContext", "true" |
| systemProperty "spark.ui.enabled", "false" |
| systemProperty "spark.ui.showConsoleProgress", "false" |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs = files( |
| project(":sdks:java:core").sourceSets.test.output.classesDirs, |
| project(":runners:core-java").sourceSets.test.output.classesDirs, |
| ) |
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) |
| |
| // Only one SparkContext may be running in a JVM (SPARK-2243) |
| forkEvery 1 |
| maxParallelForks 4 |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| // Unbounded |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| // Metrics |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' |
| // SDF |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| // Portability |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| } |
| jvmArgs '-Xmx3g' |
| } |
| |
| task validatesRunnerStreaming(type: Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| def pipelineOptions = JsonOutput.toJson([ |
| "--runner=TestSparkRunner", |
| "--forceStreaming=true", |
| "--enableSparkMetricSinks=true", |
| ]) |
| systemProperty "beamTestPipelineOptions", pipelineOptions |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) |
| |
| // Only one SparkContext may be running in a JVM (SPARK-2243) |
| forkEvery 1 |
| maxParallelForks 4 |
| useJUnit { |
| includeCategories 'org.apache.beam.runners.spark.StreamingTest' |
| } |
| filter { |
| // BEAM-11653 MetricsSinkTest is failing with Spark 3 |
| excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' |
| } |
| } |
| |
| task validatesStructuredStreamingRunnerBatch(type: Test) { |
| group = "Verification" |
| // Disable gradle cache |
| outputs.upToDateWhen { false } |
| def pipelineOptions = JsonOutput.toJson([ |
| "--runner=SparkStructuredStreamingRunner", |
| "--testMode=true", |
| "--streaming=false", |
| ]) |
| systemProperty "beamTestPipelineOptions", pipelineOptions |
| systemProperty "spark.sql.shuffle.partitions", "4" |
| systemProperty "spark.ui.enabled", "false" |
| systemProperty "spark.ui.showConsoleProgress", "false" |
| |
| classpath = configurations.validatesRunner |
| testClassesDirs = files( |
| project(":sdks:java:core").sourceSets.test.output.classesDirs, |
| project(":runners:core-java").sourceSets.test.output.classesDirs, |
| ) |
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) |
| |
| // Only one SparkContext may be running in a JVM (SPARK-2243) |
| forkEvery 1 |
| maxParallelForks 4 |
| // Increase memory heap in order to avoid OOM errors |
| jvmArgs '-Xmx4g' |
| useJUnit { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| // Unbounded |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| // State and Timers |
| excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| // Metrics |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' |
| // multiple coders bug BEAM-8894 |
| excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' |
| // SDF |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| // Portability |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| } |
| filter { |
| // Combine with context not implemented |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContextEmpty' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSlidingWindowsCombineWithContext' |
| // multiple coders bug BEAM-8894 |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders' |
| // SDF |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded' |
| } |
| } |
| |
| task validatesRunner { |
| group = "Verification" |
| description "Validates Spark runner" |
| dependsOn validatesRunnerBatch |
| dependsOn validatesRunnerStreaming |
| // It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass. |
| // Otherwise, it breaks Spark runner ValidatesRunner tests. |
| //dependsOn validatesStructuredStreamingRunnerBatch |
| } |
| |
| // Generates :runners:spark:*:runQuickstartJavaSpark task |
| createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark') |
| |
| task hadoopVersionsTest(group: "Verification") { |
| def taskNames = hadoopVersions.keySet().stream() |
| .map{num -> "hadoopVersion${num}Test"} |
| .collect(Collectors.toList()) |
| dependsOn taskNames |
| } |
| |
| hadoopVersions.each {kv -> |
| task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") { |
| description = "Runs Spark tests with Hadoop version $kv.value" |
| classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath |
| systemProperty "beam.spark.test.reuseSparkContext", "true" |
| systemProperty "spark.sql.shuffle.partitions", "4" |
| systemProperty "spark.ui.enabled", "false" |
| systemProperty "spark.ui.showConsoleProgress", "false" |
| systemProperty "beamTestPipelineOptions", """[ |
| "--runner=TestSparkRunner", |
| "--streaming=false", |
| "--enableSparkMetricSinks=true" |
| ]""" |
| // Only one SparkContext may be running in a JVM (SPARK-2243) |
| forkEvery 1 |
| maxParallelForks 4 |
| include "**/*Test.class" |
| useJUnit { |
| excludeCategories "org.apache.beam.runners.spark.StreamingTest" |
| excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" |
| } |
| } |
| } |