blob: ba25078b64e313fbcddfc3b7c2f303d9304e8117 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.stream.Collectors
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.kafka',
mavenRepositories: [
[id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
],
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Kafka"
ext {
summary = "Library to read Kafka topics."
// newer versions e.g. 7.6.* require dropping support for older kafka versions.
confluentVersion = "7.5.5"
}
def kafkaVersions = [
'201': "2.0.1",
'231': "2.3.1",
'241': "2.4.1",
'251': "2.5.1",
'282': "2.8.2",
'312': "3.1.2",
'390': "3.9.0",
]
kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")}
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:protobuf")
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
implementation library.java.avro
// Get back to "provided" since 2.14
provided library.java.kafka_clients
if (JavaVersion.current().compareTo(JavaVersion.VERSION_21) >= 0) {
// this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17
// but needed for compile only under Java21, specifically, required for extending from AbstractKafkaAvroDeserializer
compileOnly library.java.kafka
}
testImplementation library.java.kafka_clients
testImplementation project(path: ":runners:core-java")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
implementation (group: 'com.google.cloud.hosted.kafka', name: 'managed-kafka-auth-login-handler', version: '1.0.5') {
// "kafka-clients" has to be provided since user can use its own version.
exclude group: 'org.apache.kafka', module: 'kafka-clients'
// "kafka-schema-registry-client must be excluded per the Google Cloud documentation:
// https://cloud.google.com/managed-service-for-apache-kafka/docs/quickstart-avro#configure_and_run_the_producer
exclude group: "io.confluent", module: "kafka-schema-registry-client"
}
implementation ("io.confluent:kafka-avro-serializer:${confluentVersion}") {
// zookeeper depends on "spotbugs-annotations:3.1.9" which clashes with current
// "spotbugs-annotations:3.1.12" used in Beam. Not required.
exclude group: "org.apache.zookeeper", module: "zookeeper"
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
implementation ("io.confluent:kafka-schema-registry-client:${confluentVersion}") {
// It depends on "spotbugs-annotations:3.1.9" which clashes with current
// "spotbugs-annotations:3.1.12" used in Beam. Not required.
exclude group: "org.apache.zookeeper", module: "zookeeper"
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
// everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
permitUnusedDeclared library.java.everit_json_schema
provided library.java.everit_json_schema
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:io:synthetic")
testImplementation project(":sdks:java:managed")
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
// For testing Cross-language transforms
testImplementation library.java.avro
testImplementation library.java.junit
testImplementation library.java.mockito_core
testRuntimeOnly library.java.mockito_inline
testImplementation library.java.testcontainers_kafka
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
kafkaVersions.each {"kafkaVersion$it.key" "org.apache.kafka:kafka-clients:$it.value"}
}
kafkaVersions.each { kv ->
configurations."kafkaVersion$kv.key" {
resolutionStrategy {
force "org.apache.kafka:kafka-clients:$kv.value"
}
}
}
kafkaVersions.each {kv ->
task "kafkaVersion${kv.key}Test"(type: Test) {
group = "Verification"
description = "Runs KafkaIO tests with Kafka clients API $kv.value"
outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath
systemProperty "beam.target.kafka.version", kv.value
include '**/KafkaIOTest.class'
}
}
//Because this runs many integration jobs in parallel, each of which use a
//container, it can fail locally due to performance limitations on a desktop.
//To avoid this, use --max-workers=N, where N is less than half your CPUs.
//4 is a good start for parallelism without overloading your computer.
task kafkaVersionsCompatibilityTest {
group = "Verification"
description = 'Runs KafkaIO with different Kafka client APIs'
def testNames = createTestList(kafkaVersions, "Test")
dependsOn testNames
kafkaVersions.keySet().each {
dependsOn (":sdks:java:io:kafka:kafka-${it}:kafkaVersion${it}BatchIT")
}
}
static def createTestList(Map<String, String> prefixMap, String suffix) {
return prefixMap.keySet().stream()
.map{version -> "kafkaVersion${version}${suffix}"}
.collect(Collectors.toList())
}