blob: ec344757961381403e8d8d8a2337e45d343e22fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.nexmark',
exportJavadoc: false,
archivesBaseName: 'beam-sdks-java-nexmark',
)
description = "Apache Beam :: SDKs :: Java :: Nexmark"
// When running via Gradle, this property can be used to pass commandline arguments
// to the nexmark launch
def nexmarkArgsProperty = "nexmark.args"
// When running via Gradle, this property can be set to "true" to enable profiling for
// the nexmark pipeline. Currently only works for the Dataflow runner.
def nexmarkProfilingProperty = "nexmark.profile"
// When running via Gradle, this property sets the runner dependency
def nexmarkRunnerProperty = "nexmark.runner"
def nexmarkRunnerDependency = project.findProperty(nexmarkRunnerProperty)
?: ":runners:direct-java"
def nexmarkRunnerVersionProperty = "nexmark.runner.version"
def nexmarkRunnerVersion = project.findProperty(nexmarkRunnerVersionProperty)
def isSparkRunner = nexmarkRunnerDependency.startsWith(":runners:spark:")
def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(nexmarkRunnerDependency)
def isDataflowRunnerV2 = isDataflowRunner && "V2".equals(nexmarkRunnerVersion)
def runnerConfiguration = ":runners:direct-java".equals(nexmarkRunnerDependency) ? "shadow" : null
if (isDataflowRunner) {
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference a property from the project directly.
*/
if (isDataflowRunnerV2) {
evaluationDependsOn(":runners:google-cloud-dataflow-java")
} else {
evaluationDependsOn(":runners:google-cloud-dataflow-java:worker")
}
}
configurations {
// A configuration for running the Nexmark launcher directly from Gradle, which
// uses Gradle to put the appropriate dependencies on the Classpath rather than
// bundling them into a fat jar
gradleRun
}
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:sql")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:testing:test-utils")
implementation library.java.google_api_client
implementation library.java.junit
implementation library.java.hamcrest
implementation library.java.google_api_services_bigquery
implementation library.java.jackson_core
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
implementation library.java.jackson_datatype_joda
implementation library.java.avro
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.kafka_clients
compileOnly library.java.error_prone_annotations
testRuntimeOnly library.java.slf4j_jdk14
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
testImplementation project(path: ":sdks:java:testing:test-utils")
gradleRun project(project.path)
gradleRun project(path: nexmarkRunnerDependency, configuration: runnerConfiguration)
}
if (isSparkRunner) {
configurations.gradleRun {
// Using Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
}
def getNexmarkArgs = {
def nexmarkArgsStr = project.findProperty(nexmarkArgsProperty) ?: ""
def nexmarkArgsList = new ArrayList<String>()
Collections.addAll(nexmarkArgsList, nexmarkArgsStr.split())
if (isDataflowRunner) {
if (isDataflowRunnerV2) {
nexmarkArgsList.add("--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=service")
def sdkContainerImage = project.findProperty('sdkContainerImage') ?: project(":runners:google-cloud-dataflow-java").dockerJavaImageName
nexmarkArgsList.add("--sdkContainerImage=${sdkContainerImage}")
// TODO(https://github.com/apache/beam/issues/20880) enable all queries once issues with runner V2 is fixed.
if (nexmarkArgsList.contains("--streaming=true")) {
nexmarkArgsList.add("--skipQueries=AVERAGE_PRICE_FOR_CATEGORY,AVERAGE_SELLING_PRICE_BY_SELLER,WINNING_BIDS,BOUNDED_SIDE_INPUT_JOIN,SESSION_SIDE_INPUT_JOIN,PORTABILITY_BATCH") // 4, 6, 9, 13, 14, 15
} else {
nexmarkArgsList.add("--skipQueries=LOCAL_ITEM_SUGGESTION,AVERAGE_PRICE_FOR_CATEGORY,AVERAGE_SELLING_PRICE_BY_SELLER,HIGHEST_BID,WINNING_BIDS,SESSION_SIDE_INPUT_JOIN,BOUNDED_SIDE_INPUT_JOIN") // 3, 4, 6, 7, 9, 13, 14, 15
}
} else {
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
// Provide job with a customizable worker jar.
// With legacy worker jar, containerImage is set to empty (i.e. to use the internal build).
// More context and discussions can be found in PR#6694.
nexmarkArgsList.add("--dataflowWorkerJar=${dataflowWorkerJar}".toString())
nexmarkArgsList.add('--workerHarnessContainerImage=')
def nexmarkProfile = project.findProperty(nexmarkProfilingProperty) ?: ""
if (nexmarkProfile.equals("true")) {
nexmarkArgsList.add('--profilingAgentConfiguration={ "APICurated": true }')
}
}
}
if(isSparkRunner) {
// For transparency, be explicit about configuration of local Spark
nexmarkArgsList.add("--sparkMaster=local[4]")
}
return nexmarkArgsList
}
// Execute the Nexmark queries or suites via Gradle.
//
// Parameters:
// -Pnexmark.runner
// Specify a runner subproject, such as ":runners:spark:3" or ":runners:flink:1.17"
// Defaults to ":runners:direct-java"
//
// -Pnexmark.args
// Specify the command line for invoking org.apache.beam.sdk.nexmark.Main
task run(type: JavaExec) {
def nexmarkArgsList = getNexmarkArgs()
if (isDataflowRunner) {
if (isDataflowRunnerV2) {
dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerJavaContainer"
finalizedBy ":runners:google-cloud-dataflow-java:cleanUpDockerJavaImages"
} else {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
}
}
if(isSparkRunner) {
// Disable UI
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
// Dataset runner only
systemProperty "spark.sql.shuffle.partitions", "4"
}
mainClass = "org.apache.beam.sdk.nexmark.Main"
classpath = configurations.gradleRun
args nexmarkArgsList.toArray()
}