blob: 90890a7d5856b60fdfe7426029c7ea586c032952 [file] [log] [blame]
import org.apache.beam.gradle.BeamModulePlugin
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Flink Runner JobServer build file shared by all of its build targets.
*
* See build.gradle files for an example of how to use this script.
*/
apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
// we need to set mainClassName before applying shadow plugin
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName,
validateShadowJar: false,
exportJavadoc: false,
shadowClosure: {
append "reference.conf"
},
)
// Resolve the Flink project name (and version) the job-server is based on
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
description = project(flinkRunnerProject).description + " :: Job Server"
/*
* We have to explicitly set all directories here to make sure each
* version of Flink has the correct overrides set.
*/
sourceSets {
main {
java {
srcDirs = main_source_dirs
}
resources {
srcDirs = main_resources_dirs
}
}
test {
java {
srcDirs = test_source_dirs
}
resources {
srcDirs = test_resources_dirs
}
}
}
configurations {
validatesPortableRunner
}
configurations.all {
// replace commons logging with the jcl-over-slf4j bridge
exclude group: "commons-logging", module: "commons-logging"
}
dependencies {
implementation project(flinkRunnerProject)
permitUnusedDeclared project(flinkRunnerProject) // BEAM-11761
runtimeOnly group: "org.slf4j", name: "jcl-over-slf4j", version: dependencies.create(project.library.java.slf4j_api).getVersion()
validatesPortableRunner project(path: flinkRunnerProject, configuration: "testRuntimeMigration")
validatesPortableRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesPortableRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesPortableRunner project(path: ":runners:portability:java", configuration: "testRuntimeMigration")
runtimeOnly project(":sdks:java:extensions:google-cloud-platform-core")
runtimeOnly library.java.slf4j_simple
// TODO: Enable HDFS file system.
runtimeOnly project(":sdks:java:io:amazon-web-services2")
// External transform expansion
// Kafka
runtimeOnly project(":sdks:java:io:kafka")
runtimeOnly library.java.kafka_clients
// PubSub
runtimeOnly project(":sdks:java:io:google-cloud-platform")
// SqlTransform
runtimeOnly project(":sdks:java:extensions:sql:expansion-service")
}
// NOTE: runShadow must be used in order to run the job server. The standard run
// task will not work because the flink runner classes only exist in the shadow
// jar.
runShadow {
args = []
if (project.hasProperty('jobHost'))
args += ["--job-host=${project.property('jobHost')}"]
if (project.hasProperty('artifactsDir'))
args += ["--artifacts-dir=${project.property('artifactsDir')}"]
if (project.hasProperty('cleanArtifactsPerJob'))
args += ["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
if (project.hasProperty('flinkMaster'))
args += ["--flink-master=${project.property('flinkMaster')}"]
else if (project.hasProperty('flinkMasterUrl'))
args += ["--flink-master=${project.property('flinkMasterUrl')}"]
if (project.hasProperty('flinkConfDir'))
args += ["--flink-conf-dir=${project.property('flinkConfDir')}"]
if (project.hasProperty('sdkWorkerParallelism'))
args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"]
logger.info('Will start flink job server with args {}', args)
// Enable remote debugging.
jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
if (project.hasProperty("logLevel"))
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
}
def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) {
def pipelineOptions = [
// Limit resource consumption via parallelism
"--parallelism=2",
]
if (streaming) {
pipelineOptions += "--streaming"
if (checkpointing) {
pipelineOptions += "--checkpointingInterval=3000"
pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
}
}
createPortableValidatesRunnerTask(
name: "validatesPortableRunner${name}",
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
testClasspathConfiguration: configurations.validatesPortableRunner,
numParallelTests: 1,
pipelineOpts: pipelineOptions,
environment: docker ? BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.DOCKER : BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
testCategories: {
if (docker) {
includeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
return
}
if (streaming && checkpointing) {
includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
// TestStreamSource does not support checkpointing
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
return
}
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
// Larger keys are possible, but they require more memory.
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
if (streaming) {
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
return
}
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
},
testFilter: {
// Flink reshuffle override does not preserve all metadata
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/20269)
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
// TODO(https://github.com/apache/beam/issues/20843)
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode'
// TODO(https://github.com/apache/beam/issues/20844)
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
// TODO(BEAM-12710)
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
// TODO(BEAM-13498)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew'
// TODO(https://github.com/apache/beam/issues/21472)
excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState'
},
)
}
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true)
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false)
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false)
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
tasks.register("validatesPortableRunner") {
dependsOn validatesPortableRunnerDocker
dependsOn validatesPortableRunnerBatch
dependsOn validatesPortableRunnerStreaming
dependsOn validatesPortableRunnerStreamingCheckpoint
}
def jobPort = BeamModulePlugin.getRandomPort()
def artifactPort = BeamModulePlugin.getRandomPort()
def setupTask = project.tasks.register("flinkJobServerSetup", Exec) {
dependsOn shadowJar
def pythonDir = project.project(":sdks:python").projectDir
def flinkJobServerJar = shadowJar.archivePath
def flinkDir = project.project(":runners:flink").projectDir
def additionalArgs = ""
if (project.hasProperty('flinkConfDir'))
additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}"
else
additionalArgs += "--flink-conf-dir=$flinkDir/src/test/resources"
executable 'sh'
args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args \"${additionalArgs}\""
}
def cleanupTask = project.tasks.register("flinkJobServerCleanup", Exec) {
def pythonDir = project.project(":sdks:python").projectDir
executable 'sh'
args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name}"
}
createCrossLanguageValidatesRunnerTask(
startJobServer: setupTask,
cleanupJobServer: cleanupTask,
classpath: configurations.validatesPortableRunner,
numParallelTests: 1,
pythonPipelineOptions: [
"--runner=PortableRunner",
"--job_endpoint=localhost:${jobPort}",
"--environment_cache_millis=10000",
"--experiments=beam_fn_api",
"--parallelism=2",
],
javaPipelineOptions: [
"--runner=PortableRunner",
"--jobEndpoint=localhost:${jobPort}",
"--environmentCacheMillis=10000",
"--experiments=beam_fn_api",
"--parallelism=2",
],
goScriptOptions: [
"--runner flink",
"--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"",
"--endpoint localhost:${jobPort}",
],
)
shadowJar {
manifest {
attributes(["Multi-Release": true])
}
}
// miniCluster jar starts an embedded Flink cluster intended for use in testing.
tasks.register("miniCluster", Jar) {
dependsOn shadowJar
archiveBaseName = "${project.archivesBaseName}-mini-cluster"
dependencies {
runtimeOnly project(path: flinkRunnerProject, configuration: "miniCluster")
}
from zipTree(shadowJar.archivePath).matching {
// If these classes aren't excluded from the mini cluster jar, they will be loaded instead of
// the corresponding classes in the submitted job jar, preventing pipeline resources from
// loading successfully.
exclude "**/FlinkPipelineRunner*"
exclude "**/PortablePipelineJarUtils*"
}
manifest {
attributes('Main-Class': 'org.apache.beam.runners.flink.FlinkMiniClusterEntryPoint')
}
zip64 true // jar needs to contain more than 65535 files
}