blob: 2bd3ad7b8db5c825939921d9587d0290718443e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Main Flink Runner build file shared by all of its build targets.
* The file needs to be parameterized by the Flink version and the source directories.
*
* See build.gradle files for an example of how to use this script.
*/
import groovy.json.JsonOutput
def base_path = ".."
def overrides(versions, type, base_path) {
versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
}
def all_versions = flink_versions.split(",")
def previous_versions = all_versions.findAll { it < flink_major }
// Version specific code overrides.
def main_source_overrides = overrides(previous_versions, "main", base_path)
def test_source_overrides = overrides(previous_versions, "test", base_path)
def main_resources_overrides = []
def test_resources_overrides = []
def archivesBaseName = "beam-runners-flink-${flink_major}"
apply plugin: 'org.apache.beam.module'
applyJavaNature(
enableStrictDependencies:true,
automaticModuleName: 'org.apache.beam.runners.flink',
archivesBaseName: archivesBaseName,
// flink runner jars are in same package name. Publish javadoc once.
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
)
description = "Apache Beam :: Runners :: Flink $flink_version"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference the "sourceSets.test.output" directly.
*/
evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":runners:core-java")
evaluationDependsOn(":examples:java")
/*
* Copy & merge source overrides into build directory.
*/
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
it.from main_source_overrides
it.into "${sourceOverridesBase}/main/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
it.from main_resources_overrides
it.into "${sourceOverridesBase}/main/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
it.from test_source_overrides
it.into "${sourceOverridesBase}/test/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
it.from test_resources_overrides
it.into "${sourceOverridesBase}/test/resources"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
// add dependency to gradle Java plugin defined tasks
compileJava.dependsOn copySourceOverrides
processResources.dependsOn copyResourcesOverrides
compileTestJava.dependsOn copyTestSourceOverrides
processTestResources.dependsOn copyTestResourcesOverrides
// add dependency BeamModulePlugin defined custom tasks
// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc)
def sourcesJar = project.tasks.findByName('sourcesJar')
if (sourcesJar != null) {
sourcesJar.dependsOn copySourceOverrides
sourcesJar.dependsOn copyResourcesOverrides
}
def testSourcesJar = project.tasks.findByName('testSourcesJar')
if (testSourcesJar != null) {
testSourcesJar.dependsOn copyTestSourceOverrides
testSourcesJar.dependsOn copyTestResourcesOverrides
}
/*
* We have to explicitly set all directories here to make sure each
* version of Flink has the correct overrides set.
*/
def sourceBase = "${project.projectDir}/../src"
sourceSets {
main {
java {
srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
}
resources {
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
}
}
test {
java {
srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
}
resources {
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
}
}
}
test {
systemProperty "log4j.configuration", "log4j-test.properties"
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
// Change log level to debug only for the package and nested packages:
// systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.flink.translation.wrappers.streaming", "debug"
jvmArgs "-XX:-UseGCOverheadLimit"
if (System.getProperty("beamSurefireArgline")) {
jvmArgs System.getProperty("beamSurefireArgline")
}
// TODO(BEAM-6418) Running tests of all Flink versions in parallel can be too harsh on Jenkins memory.
// Run them serially for now, to avoid "Exit code 137", i.e. Jenkins host killing the Gradle test process.
def flink_minor_version = project.path.split(':').last()
for (version in project.ext.allFlinkVersions) {
if (version == flink_minor_version) {
break
}
mustRunAfter(":runners:flink:${version}:test")
}
}
configurations {
validatesRunner
miniCluster
examplesJavaIntegrationTest
}
dependencies {
compileOnly project(":sdks:java:build-tools")
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(":runners:java-job-service")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation library.java.vendored_grpc_1_69_0
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.args4j
implementation "org.apache.flink:flink-clients:$flink_version"
// Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation
// configuration (https://issues.apache.org/jira/browse/BEAM-11732).
permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
implementation "org.apache.flink:flink-streaming-java:$flink_version"
// RocksDB state backend (included in the Flink distribution)
provided "org.apache.flink:flink-statebackend-rocksdb:$flink_version"
testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version"
testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests"
testImplementation "org.apache.flink:flink-test-utils:$flink_version"
miniCluster "org.apache.flink:flink-runtime-web:$flink_version"
implementation "org.apache.flink:flink-core:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
implementation "org.apache.flink:flink-java:$flink_version"
implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
// FlinkStateInternalsTest extends abstract StateInternalsTest
testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration")
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
// TODO(https://github.com/apache/beam/issues/34056) remove powermock once remove Whitebox usages
testImplementation "org.powermock:powermock-reflect:2.0.9"
testImplementation library.java.google_api_services_bigquery
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.jackson_dataformat_yaml
testImplementation "org.apache.flink:flink-core:$flink_version:tests"
testImplementation "org.apache.flink:flink-connector-test-utils:$flink_version"
testImplementation project(":sdks:java:harness")
testRuntimeOnly library.java.slf4j_simple
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesRunner project(project.path)
implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation library.java.jackson_databind
runtimeOnly library.java.jackson_jaxb_annotations
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration")
}
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def tempLocation = project.findProperty('tempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
class ValidatesRunnerConfig {
String name
boolean streaming
boolean checkpointing
boolean useDataStreamForBatch
ArrayList<String> sickbayTests
}
def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/21306)
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnWindowTimestampSkew',
// Flink errors are not deterministic. Exception may just be
// org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FAILED
// instead of the actual cause. Real cause is visible in the logs.
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
// TODO(https://github.com/apache/beam/issues/18198)
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// testTriggeredLatestSingleton gets "stuck" both with or without --useDatastreamForBatch
]
def createValidatesRunnerTask(Map m) {
def config = m as ValidatesRunnerConfig
tasks.register(config.name, Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def runnerType = config.streaming ? "streaming" : "batch"
description = "Validates the ${runnerType} runner"
def pipelineOptionsArray = ["--runner=TestFlinkRunner",
"--streaming=${config.streaming}",
"--useDataStreamForBatch=${config.useDataStreamForBatch}",
"--parallelism=1",
]
if (config.checkpointing) {
pipelineOptionsArray.addAll([
"--checkpointingInterval=3000",
"--shutdownSourcesAfterIdleMs=60000",
])
}
def pipelineOptions = JsonOutput.toJson(pipelineOptionsArray)
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
// maxParallelForks decreased from 4 in order to avoid OOM errors
maxParallelForks 2
useJUnit {
if (config.checkpointing) {
includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
// TestStreamSource does not support checkpointing
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
} else {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
if (config.streaming) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' // BEAM-8598
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
} else {
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
}
filter {
for (String test : config.sickbayTests) {
excludeTestsMatching test
}
// Flink reshuffle override does not preserve all metadata
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// https://github.com/apache/beam/issues/20843
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode'
// https://github.com/apache/beam/issues/20845
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
// https://github.com/apache/beam/issues/20844
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
if (!config.streaming) {
// FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing.
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew'
}
// Extremely flaky: https://github.com/apache/beam/issues/19814
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'
// TODO(https://github.com/apache/beam/issues/29972) due to runtimeContext initialized after initial split
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}
}
}
createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests)
// We specifically have a variant which runs with checkpointing enabled for the
// tests that require it since running a checkpoint variant is significantly
// slower since we have to wait shutdownSourcesAfterIdleMs before the source
// can shutdown because of https://issues.apache.org/jira/browse/FLINK-2491
// not supporting checkpointing when an operator has been shutdown.
createValidatesRunnerTask(name: "validatesRunnerStreamingCheckpointing", streaming: true, checkpointing: true, sickbayTests: sickbayTests)
tasks.register('validatesRunner') {
group = 'Verification'
description "Validates Flink runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithDataStream
dependsOn validatesRunnerStreaming
dependsOn validatesRunnerStreamingCheckpointing
}
tasks.register("validatesRunnerSickbay", Test) {
group = "Verification"
description "Validates Flink runner (Sickbay Tests)"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestFlinkRunner",
])
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
filter {
for (String test : sickbayTests) {
includeTestsMatching test
}
}
}
if (project.ext.flink_major == project.ext.latestFlinkVersion) {
// TODO(yathu) support running validation on all supported Flink versions
// currently maven-archetype Flink profile pinned to single Flink version
// Generates :runners:flink:<latestFlinkVersion>:runQuickstartJavaFlinkLocal
createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'FlinkLocal')
}
tasks.register("examplesIntegrationTest", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptionsArray = ["--runner=TestFlinkRunner",
"--parallelism=2",
"--tempLocation=${tempLocation}",
"--tempRoot=${tempLocation}",
"--project=${gcpProject}",
]
def pipelineOptions = JsonOutput.toJson(pipelineOptionsArray)
systemProperty "beamTestPipelineOptions", pipelineOptions
include '**/*IT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
useJUnit {
filter{
// TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with FlinkRunner: Assertion error
excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
// TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with FlinkRunner: Error deleting table, Not found: Dataset
excludeTestsMatching 'org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2eBigQueryTornadoesWithStorageApiUsingQuery'
}
}
}
/**
* Updates the documentation with the current pipeline options.
*/
def createPipelineOptionsTableTask(String target) {
tasks.register("generatePipelineOptionsTable${target}", JavaExec) {
group = 'Website'
description = "Generates a table with pipeline options for the Flink Runner documentation page"
classpath = sourceSets.test.runtimeClasspath
mainClass = 'org.apache.beam.runners.flink.website.PipelineOptionsTableGenerator'
args = [target]
standardOutput = new ByteArrayOutputStream()
doLast {
def dest = file("${project(':website').getProjectDir()}/www/site/layouts/shortcodes/flink_${target.toLowerCase()}_pipeline_options.html")
if (!dest.exists()) {
throw new GradleException("Pipeline options file is not in expected location: ${dest}")
}
dest.write(standardOutput.toString())
}
}
}
createPipelineOptionsTableTask('Java')
createPipelineOptionsTableTask('Python')
// Update the pipeline options documentation before running the tests
test.dependsOn(generatePipelineOptionsTableJava)
test.dependsOn(generatePipelineOptionsTablePython)
// delegate spotlessApply to :runners:flink:spotlessApply
tasks.named("spotlessApply") {
dependsOn ":runners:flink:spotlessApply"
}