| import org.apache.beam.gradle.BeamModulePlugin |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * Flink Runner JobServer build file shared by all of its build targets. |
| * |
| * See build.gradle files for an example of how to use this script. |
| */ |
| |
| apply plugin: 'org.apache.beam.module' |
| apply plugin: 'application' |
| // we need to set mainClassName before applying shadow plugin |
| mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver" |
| |
| applyJavaNature( |
| automaticModuleName: 'org.apache.beam.runners.flink.jobserver', |
| archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName, |
| validateShadowJar: false, |
| exportJavadoc: false, |
| shadowClosure: { |
| append "reference.conf" |
| }, |
| ) |
| |
| // Resolve the Flink project name (and version) the job-server is based on |
| def flinkRunnerProject = "${project.path.replace(":job-server", "")}" |
| |
| description = project(flinkRunnerProject).description + " :: Job Server" |
| |
| /* |
| * We have to explicitly set all directories here to make sure each |
| * version of Flink has the correct overrides set. |
| */ |
| sourceSets { |
| main { |
| java { |
| srcDirs = main_source_dirs |
| } |
| resources { |
| srcDirs = main_resources_dirs |
| } |
| } |
| test { |
| java { |
| srcDirs = test_source_dirs |
| } |
| resources { |
| srcDirs = test_resources_dirs |
| } |
| } |
| } |
| |
| configurations { |
| validatesPortableRunner |
| } |
| |
| configurations.all { |
| // replace commons logging with the jcl-over-slf4j bridge |
| exclude group: "commons-logging", module: "commons-logging" |
| } |
| |
| dependencies { |
| compile project(flinkRunnerProject) |
| runtime group: "org.slf4j", name: "jcl-over-slf4j", version: dependencies.create(project.library.java.slf4j_api).getVersion() |
| validatesPortableRunner project(path: flinkRunnerProject, configuration: "testRuntime") |
| validatesPortableRunner project(path: ":sdks:java:core", configuration: "shadowTest") |
| validatesPortableRunner project(path: ":runners:core-java", configuration: "testRuntime") |
| validatesPortableRunner project(path: ":runners:portability:java", configuration: "testRuntime") |
| runtime project(":sdks:java:extensions:google-cloud-platform-core") |
| runtime library.java.slf4j_simple |
| // TODO: Enable AWS and HDFS file system. |
| // External transform expansion |
| // Kafka |
| runtimeOnly project(":sdks:java:io:kafka") |
| runtimeOnly library.java.kafka_clients |
| // PubSub |
| runtimeOnly project(":sdks:java:io:google-cloud-platform") |
| // SqlTransform |
| runtimeOnly project(":sdks:java:extensions:sql:expansion-service") |
| } |
| |
| // NOTE: runShadow must be used in order to run the job server. The standard run |
| // task will not work because the flink runner classes only exist in the shadow |
| // jar. |
| runShadow { |
| args = [] |
| if (project.hasProperty('jobHost')) |
| args += ["--job-host=${project.property('jobHost')}"] |
| if (project.hasProperty('artifactsDir')) |
| args += ["--artifacts-dir=${project.property('artifactsDir')}"] |
| if (project.hasProperty('cleanArtifactsPerJob')) |
| args += ["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"] |
| if (project.hasProperty('flinkMaster')) |
| args += ["--flink-master=${project.property('flinkMaster')}"] |
| else if (project.hasProperty('flinkMasterUrl')) |
| args += ["--flink-master=${project.property('flinkMasterUrl')}"] |
| if (project.hasProperty('flinkConfDir')) |
| args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] |
| if (project.hasProperty('sdkWorkerParallelism')) |
| args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] |
| |
| // Enable remote debugging. |
| jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] |
| if (project.hasProperty("logLevel")) |
| jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"] |
| } |
| |
| def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpointing) { |
| def pipelineOptions = [ |
| // Limit resource consumption via parallelism |
| "--parallelism=2", |
| ] |
| if (streaming) { |
| pipelineOptions += "--streaming" |
| if (checkpointing) { |
| pipelineOptions += "--checkpointingInterval=3000" |
| pipelineOptions += "--shutdownSourcesAfterIdleMs=60000" |
| } |
| } |
| createPortableValidatesRunnerTask( |
| name: "validatesPortableRunner${name}", |
| jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", |
| jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0", |
| testClasspathConfiguration: configurations.validatesPortableRunner, |
| numParallelTests: 1, |
| pipelineOpts: pipelineOptions, |
| environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED, |
| testCategories: { |
| if (streaming && checkpointing) { |
| includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' |
| // TestStreamSource does not support checkpointing |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| } else { |
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' |
| excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' |
| // Larger keys are possible, but they require more memory. |
| excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' |
| if (streaming) { |
| excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp' |
| } else { |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' |
| excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' |
| } |
| } |
| }, |
| testFilter: { |
| // TODO(BEAM-10016) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2' |
| // TODO(BEAM-11310) |
| excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent' |
| }, |
| ) |
| } |
| |
| project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false) |
| project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false) |
| project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true) |
| |
| task validatesPortableRunner() { |
| dependsOn validatesPortableRunnerBatch |
| dependsOn validatesPortableRunnerStreaming |
| dependsOn validatesPortableRunnerStreamingCheckpoint |
| } |
| |
| def jobPort = BeamModulePlugin.startingExpansionPortNumber.getAndDecrement() |
| def artifactPort = BeamModulePlugin.startingExpansionPortNumber.getAndDecrement() |
| |
| def setupTask = project.tasks.create(name: "flinkJobServerSetup", type: Exec) { |
| dependsOn shadowJar |
| def pythonDir = project.project(":sdks:python").projectDir |
| def flinkJobServerJar = shadowJar.archivePath |
| |
| executable 'sh' |
| args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar}" |
| } |
| |
| def cleanupTask = project.tasks.create(name: "flinkJobServerCleanup", type: Exec) { |
| def pythonDir = project.project(":sdks:python").projectDir |
| |
| executable 'sh' |
| args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name}" |
| } |
| |
| createCrossLanguageValidatesRunnerTask( |
| startJobServer: setupTask, |
| cleanupJobServer: cleanupTask, |
| classpath: configurations.validatesPortableRunner, |
| numParallelTests: 1, |
| pythonPipelineOptions: [ |
| "--runner=PortableRunner", |
| "--job_endpoint=localhost:${jobPort}", |
| "--environment_cache_millis=10000", |
| "--experiments=beam_fn_api", |
| "--parallelism=2", |
| ], |
| javaPipelineOptions: [ |
| "--runner=PortableRunner", |
| "--jobEndpoint=localhost:${jobPort}", |
| "--environmentCacheMillis=10000", |
| "--experiments=beam_fn_api", |
| "--parallelism=2", |
| ] |
| ) |
| |
| // miniCluster jar starts an embedded Flink cluster intended for use in testing. |
| task miniCluster(type: Jar, dependsOn: shadowJar) { |
| archiveBaseName = "${project.archivesBaseName}-mini-cluster" |
| dependencies { |
| runtime project(path: flinkRunnerProject, configuration: "miniCluster") |
| } |
| from zipTree(shadowJar.archivePath).matching { |
| // If these classes aren't excluded from the mini cluster jar, they will be loaded instead of |
| // the corresponding classes in the submitted job jar, preventing pipeline resources from |
| // loading successfully. |
| exclude "**/FlinkPipelineRunner*" |
| exclude "**/PortablePipelineJarUtils*" |
| } |
| manifest { |
| attributes('Main-Class': 'org.apache.beam.runners.flink.FlinkMiniClusterEntryPoint') |
| } |
| zip64 true // jar needs to contain more than 65535 files |
| } |