| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements.  See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership.  The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * License); you may not use this file except in compliance | 
 |  * with the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an AS IS BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 |  | 
 | /** | 
 |  * Main Flink Runner build file shared by all of its build targets. | 
 |  * The file needs to be parameterized by the Flink version and the source directories. | 
 |  * | 
 |  * See build.gradle files for an example of how to use this script. | 
 |  */ | 
 |  | 
 | import groovy.json.JsonOutput | 
 |  | 
 | apply plugin: 'org.apache.beam.module' | 
 | applyJavaNature( | 
 |     enableStrictDependencies:true, | 
 |     automaticModuleName: 'org.apache.beam.runners.flink', | 
 |     archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName), | 
 | ) | 
 |  | 
 | description = "Apache Beam :: Runners :: Flink $flink_version" | 
 |  | 
 | /* | 
 |  * We need to rely on manually specifying these evaluationDependsOn to ensure that | 
 |  * the following projects are evaluated before we evaluate this project. This is because | 
 |  * we are attempting to reference the "sourceSets.test.output" directly. | 
 |  */ | 
 | evaluationDependsOn(":sdks:java:core") | 
 | evaluationDependsOn(":runners:core-java") | 
 | evaluationDependsOn(":examples:java") | 
 |  | 
 | /* | 
 |  * Copy & merge source overrides into build directory. | 
 |  */ | 
 | def sourceOverridesBase = "${project.buildDir}/source-overrides/src" | 
 |  | 
 | def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { | 
 |   it.from main_source_overrides | 
 |   it.into "${sourceOverridesBase}/main/java" | 
 |   it.duplicatesStrategy DuplicatesStrategy.INCLUDE | 
 | } | 
 | compileJava.dependsOn copySourceOverrides | 
 |  | 
 | def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { | 
 |   it.from main_resources_overrides | 
 |   it.into "${sourceOverridesBase}/main/resources" | 
 |   it.duplicatesStrategy DuplicatesStrategy.INCLUDE | 
 | } | 
 | processResources.dependsOn copyResourcesOverrides | 
 |  | 
 | def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { | 
 |   it.from test_source_overrides | 
 |   it.into "${sourceOverridesBase}/test/java" | 
 |   it.duplicatesStrategy DuplicatesStrategy.INCLUDE | 
 | } | 
 | compileTestJava.dependsOn copyTestSourceOverrides | 
 |  | 
 | def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) { | 
 |   it.from test_resources_overrides | 
 |   it.into "${sourceOverridesBase}/test/resources" | 
 |   it.duplicatesStrategy DuplicatesStrategy.INCLUDE | 
 | } | 
 | processTestResources.dependsOn copyTestResourcesOverrides | 
 |  | 
 | /* | 
 |  * We have to explicitly set all directories here to make sure each | 
 |  * version of Flink has the correct overrides set. | 
 |  */ | 
 | def sourceBase = "${project.projectDir}/../src" | 
 | sourceSets { | 
 |   main { | 
 |     java { | 
 |       srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"] | 
 |     } | 
 |     resources { | 
 |       srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"] | 
 |     } | 
 |   } | 
 |   test { | 
 |     java { | 
 |       srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"] | 
 |     } | 
 |     resources { | 
 |       srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"] | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | test { | 
 |   systemProperty "log4j.configuration", "log4j-test.properties" | 
 |   // Change log level to debug: | 
 |   // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" | 
 |   // Change log level to debug only for the package and nested packages: | 
 |   // systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.flink.translation.wrappers.streaming", "debug" | 
 |   jvmArgs "-XX:-UseGCOverheadLimit" | 
 |   if (System.getProperty("beamSurefireArgline")) { | 
 |     jvmArgs System.getProperty("beamSurefireArgline") | 
 |   } | 
 |   // TODO(BEAM-6418) Running tests of all Flink versions in parallel can be too harsh on Jenkins memory. | 
 |   // Run them serially for now, to avoid "Exit code 137", i.e. Jenkins host killing the Gradle test process. | 
 |   def flink_minor_version = project.path.split(':').last() | 
 |   for (version in project.ext.allFlinkVersions) { | 
 |     if (version == flink_minor_version) { | 
 |       break | 
 |     } | 
 |     mustRunAfter(":runners:flink:${version}:test") | 
 |   } | 
 | } | 
 |  | 
 | configurations { | 
 |   validatesRunner | 
 |   miniCluster | 
 |   examplesJavaIntegrationTest | 
 | } | 
 |  | 
 | dependencies { | 
 |   compileOnly project(":sdks:java:build-tools") | 
 |   implementation library.java.vendored_guava_32_1_2_jre | 
 |   implementation project(path: ":sdks:java:core", configuration: "shadow") | 
 |   implementation project(":runners:core-java") | 
 |   implementation project(":runners:core-construction-java") | 
 |   implementation project(":runners:java-fn-execution") | 
 |   implementation project(":runners:java-job-service") | 
 |   implementation project(":sdks:java:extensions:google-cloud-platform-core") | 
 |   implementation library.java.vendored_grpc_1_54_0 | 
 |   implementation library.java.slf4j_api | 
 |   implementation library.java.joda_time | 
 |   implementation library.java.args4j | 
 |  | 
 |   // Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore | 
 |   if (flink_version.compareTo("1.15") >= 0) { | 
 |     implementation "org.apache.flink:flink-clients:$flink_version" | 
 |     // Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation | 
 |     // configuration (https://issues.apache.org/jira/browse/BEAM-11732). | 
 |     permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version" | 
 |  | 
 |     implementation "org.apache.flink:flink-streaming-java:$flink_version" | 
 |     // RocksDB state backend (included in the Flink distribution) | 
 |     provided "org.apache.flink:flink-statebackend-rocksdb:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests" | 
 |     testImplementation "org.apache.flink:flink-test-utils:$flink_version" | 
 |  | 
 |     miniCluster "org.apache.flink:flink-runtime-web:$flink_version" | 
 |   } else { | 
 |     implementation "org.apache.flink:flink-clients_2.12:$flink_version" | 
 |     // Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation | 
 |     // configuration (https://issues.apache.org/jira/browse/BEAM-11732). | 
 |     permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version" | 
 |  | 
 |     implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version" | 
 |     // RocksDB state backend (included in the Flink distribution) | 
 |     provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-streaming-java_2.12:$flink_version:tests" | 
 |     testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version" | 
 |  | 
 |     miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version" | 
 |   } | 
 |  | 
 |   implementation "org.apache.flink:flink-core:$flink_version" | 
 |   implementation "org.apache.flink:flink-metrics-core:$flink_version" | 
 |   implementation "org.apache.flink:flink-java:$flink_version" | 
 |  | 
 |   if (flink_version.compareTo("1.14") >= 0) { | 
 |     implementation "org.apache.flink:flink-runtime:$flink_version" | 
 |     implementation "org.apache.flink:flink-optimizer:$flink_version" | 
 |     implementation "org.apache.flink:flink-metrics-core:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-runtime:$flink_version:tests" | 
 |     testImplementation "org.apache.flink:flink-rpc-akka:$flink_version" | 
 |   } else { | 
 |     implementation "org.apache.flink:flink-runtime_2.12:$flink_version" | 
 |     implementation "org.apache.flink:flink-optimizer_2.12:$flink_version" | 
 |     testImplementation "org.apache.flink:flink-runtime_2.12:$flink_version:tests" | 
 |   } | 
 |   testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") | 
 |   // FlinkStateInternalsTest extends abstract StateInternalsTest | 
 |   testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration") | 
 |   testImplementation library.java.hamcrest | 
 |   testImplementation library.java.junit | 
 |   testImplementation library.java.mockito_core | 
 |   testImplementation library.java.powermock | 
 |   testImplementation library.java.google_api_services_bigquery | 
 |   testImplementation project(":sdks:java:io:google-cloud-platform") | 
 |   testImplementation library.java.jackson_dataformat_yaml | 
 |   testImplementation "org.apache.flink:flink-core:$flink_version:tests" | 
 |   testImplementation "org.apache.flink:flink-connector-test-utils:$flink_version" | 
 |   testImplementation project(":sdks:java:harness") | 
 |   testRuntimeOnly library.java.slf4j_simple | 
 |   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") | 
 |   validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") | 
 |   validatesRunner project(project.path) | 
 |   implementation project(path: ":model:fn-execution", configuration: "shadow") | 
 |   implementation project(path: ":model:pipeline", configuration: "shadow") | 
 |   implementation project(path: ":model:job-management", configuration: "shadow") | 
 |   implementation project(":sdks:java:fn-execution") | 
 |   implementation library.java.jackson_databind | 
 |   runtimeOnly library.java.jackson_jaxb_annotations | 
 |   examplesJavaIntegrationTest project(project.path) | 
 |   examplesJavaIntegrationTest project(":examples:java") | 
 |   examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration") | 
 | } | 
 |  | 
 | def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' | 
 | def tempLocation = project.findProperty('tempLocation') ?: 'gs://temp-storage-for-end-to-end-tests' | 
 |  | 
 | class ValidatesRunnerConfig { | 
 |   String name | 
 |   boolean streaming | 
 |   boolean checkpointing | 
 |   ArrayList<String> sickbayTests | 
 | } | 
 |  | 
 | def sickbayTests = [ | 
 |         // TODO(https://github.com/apache/beam/issues/21306) | 
 |         'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnWindowTimestampSkew', | 
 | ] | 
 |  | 
 | def createValidatesRunnerTask(Map m) { | 
 |   def config = m as ValidatesRunnerConfig | 
 |   tasks.register(config.name, Test) { | 
 |     group = "Verification" | 
 |     // Disable gradle cache | 
 |     outputs.upToDateWhen { false } | 
 |     def runnerType = config.streaming ? "streaming" : "batch" | 
 |     description = "Validates the ${runnerType} runner" | 
 |     def pipelineOptionsArray = ["--runner=TestFlinkRunner", | 
 |                                 "--streaming=${config.streaming}", | 
 |                                 "--parallelism=2", | 
 |     ] | 
 |     if (config.checkpointing) { | 
 |       pipelineOptionsArray.addAll([ | 
 |               "--checkpointingInterval=3000", | 
 |               "--shutdownSourcesAfterIdleMs=60000", | 
 |       ]) | 
 |     } | 
 |     def pipelineOptions = JsonOutput.toJson(pipelineOptionsArray) | 
 |     systemProperty "beamTestPipelineOptions", pipelineOptions | 
 |     classpath = configurations.validatesRunner | 
 |     testClassesDirs = files( | 
 |       project(":sdks:java:core").sourceSets.test.output.classesDirs, | 
 |       project(":runners:core-java").sourceSets.test.output.classesDirs, | 
 |     ) | 
 |     // maxParallelForks decreased from 4 in order to avoid OOM errors | 
 |     maxParallelForks 2 | 
 |     useJUnit { | 
 |       if (config.checkpointing) { | 
 |         includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' | 
 |         // TestStreamSource does not support checkpointing | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' | 
 |       } else { | 
 |         includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' | 
 |         // Should be run only in a properly configured SDK harness environment | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' | 
 |       } | 
 |       excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' | 
 |       excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' | 
 |       if (config.streaming) { | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'  // BEAM-8598 | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' | 
 |       } else { | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' | 
 |         excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' | 
 |       } | 
 |       filter { | 
 |         for (String test : config.sickbayTests) { | 
 |           excludeTestsMatching test | 
 |         } | 
 |  | 
 |         // https://github.com/apache/beam/issues/20843 | 
 |         excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode' | 
 |         // https://github.com/apache/beam/issues/20845 | 
 |         excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' | 
 |         // https://github.com/apache/beam/issues/20844 | 
 |         excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' | 
 |       } | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests) | 
 | createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests) | 
 | // We specifically have a variant which runs with checkpointing enabled for the | 
 | // tests that require it since running a checkpoint variant is significantly | 
 | // slower since we have to wait shutdownSourcesAfterIdleMs before the source | 
 | // can shutdown because of https://issues.apache.org/jira/browse/FLINK-2491 | 
 | // not supporting checkpointing when an operator has been shutdown. | 
 | createValidatesRunnerTask(name: "validatesRunnerStreamingCheckpointing", streaming: true, checkpointing: true, sickbayTests: sickbayTests) | 
 |  | 
 | tasks.register('validatesRunner') { | 
 |   group = 'Verification' | 
 |   description "Validates Flink runner" | 
 |   dependsOn validatesRunnerBatch | 
 |   dependsOn validatesRunnerStreaming | 
 |   dependsOn validatesRunnerStreamingCheckpointing | 
 | } | 
 |  | 
 | tasks.register("validatesRunnerSickbay", Test) { | 
 |   group = "Verification" | 
 |   description "Validates Flink runner (Sickbay Tests)" | 
 |   systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ | 
 |           "--runner=TestFlinkRunner", | 
 |   ]) | 
 |  | 
 |   classpath = configurations.validatesRunner | 
 |   testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) | 
 |  | 
 |   filter { | 
 |     for (String test : sickbayTests) { | 
 |       includeTestsMatching test | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | // Generates :runners:flink:1.13:runQuickstartJavaFlinkLocal | 
 | createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'FlinkLocal') | 
 |  | 
 | tasks.register("examplesIntegrationTest", Test) { | 
 |   group = "Verification" | 
 |   // Disable gradle cache | 
 |   outputs.upToDateWhen { false } | 
 |   def pipelineOptionsArray = ["--runner=TestFlinkRunner", | 
 |                               "--parallelism=2", | 
 |                               "--tempLocation=${tempLocation}", | 
 |                               "--tempRoot=${tempLocation}", | 
 |                               "--project=${gcpProject}", | 
 |   ] | 
 |  | 
 |   def pipelineOptions = JsonOutput.toJson(pipelineOptionsArray) | 
 |   systemProperty "beamTestPipelineOptions", pipelineOptions | 
 |  | 
 |   include '**/*IT.class' | 
 |   maxParallelForks 4 | 
 |   classpath = configurations.examplesJavaIntegrationTest | 
 |   testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs) | 
 |   useJUnit { | 
 |     filter{ | 
 |       // TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with FlinkRunner: Assertion error | 
 |       excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding' | 
 |       // TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with FlinkRunner: Error deleting table, Not found: Dataset | 
 |       excludeTestsMatching 'org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2eBigQueryTornadoesWithStorageApiUsingQuery' | 
 |     } | 
 |   } | 
 |  | 
 | } | 
 |  | 
 | /** | 
 |  * Updates the documentation with the current pipeline options. | 
 |  */ | 
 | def createPipelineOptionsTableTask(String target) { | 
 |   tasks.register("generatePipelineOptionsTable${target}", JavaExec) { | 
 |     group = 'Website' | 
 |     description = "Generates a table with pipeline options for the Flink Runner documentation page" | 
 |     classpath = sourceSets.test.runtimeClasspath | 
 |     mainClass = 'org.apache.beam.runners.flink.website.PipelineOptionsTableGenerator' | 
 |     args = [target] | 
 |     standardOutput = new ByteArrayOutputStream() | 
 |     doLast { | 
 |       def dest = file("${project(':website').getProjectDir()}/www/site/layouts/shortcodes/flink_${target.toLowerCase()}_pipeline_options.html") | 
 |       if (!dest.exists()) { | 
 |         throw new GradleException("Pipeline options file is not in expected location: ${dest}") | 
 |       } | 
 |       dest.write(standardOutput.toString()) | 
 |     } | 
 |   } | 
 | } | 
 | createPipelineOptionsTableTask('Java') | 
 | createPipelineOptionsTableTask('Python') | 
 | // Update the pipeline options documentation before running the tests | 
 | test.dependsOn(generatePipelineOptionsTableJava) | 
 | test.dependsOn(generatePipelineOptionsTablePython) |