/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * License); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an AS IS BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.util.stream.Collectors

plugins { id 'org.apache.beam.module' }
applyJavaNature(
  automaticModuleName: 'org.apache.beam.sdk.io.kafka',
  mavenRepositories: [
    [id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
  ],
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Kafka"
ext {
    summary = "Library to read Kafka topics."
    confluentVersion = "7.6.0"
}

def kafkaVersions = [
    '01103': "0.11.0.3",
    '100': "1.0.0",
    '111': "1.1.1",
    '201': "2.0.1",
    '211': "2.1.1",
    '222': "2.2.2",
    '231': "2.3.1",
    '241': "2.4.1",
    '251': "2.5.1",
]

kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")}

dependencies {
  implementation library.java.vendored_guava_32_1_2_jre
  provided library.java.jackson_dataformat_csv
  permitUnusedDeclared library.java.jackson_dataformat_csv
  implementation project(path: ":sdks:java:core", configuration: "shadow")
  implementation project(":sdks:java:extensions:avro")
  implementation project(":sdks:java:extensions:protobuf")
  implementation project(":sdks:java:expansion-service")
  permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
  implementation library.java.avro
  // Get back to "provided" since 2.14
  provided library.java.kafka_clients
  if (JavaVersion.current().compareTo(JavaVersion.VERSION_21) >= 0) {
    // this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17
    // but needed for compile only under Java21, specifically, required for extending from AbstractKafkaAvroDeserializer
    compileOnly library.java.kafka
  }
  testImplementation library.java.kafka_clients
  implementation library.java.slf4j_api
  implementation library.java.joda_time
  implementation library.java.jackson_annotations
  implementation library.java.jackson_databind
  implementation "org.springframework:spring-expression:5.3.27"
  implementation ("io.confluent:kafka-avro-serializer:${confluentVersion}") {
    // zookeeper depends on "spotbugs-annotations:3.1.9" which clashes with current
    // "spotbugs-annotations:3.1.12" used in Beam. Not required.
    exclude group: "org.apache.zookeeper", module: "zookeeper"
    // "kafka-clients" has to be provided since user can use its own version.
    exclude group: "org.apache.kafka", module: "kafka-clients"
  }
  implementation ("io.confluent:kafka-schema-registry-client:${confluentVersion}") {
    // It depends on "spotbugs-annotations:3.1.9" which clashes with current
    // "spotbugs-annotations:3.1.12" used in Beam. Not required.
    exclude group: "org.apache.zookeeper", module: "zookeeper"
    // "kafka-clients" has to be provided since user can use its own version.
    exclude group: "org.apache.kafka", module: "kafka-clients"
  }
  // everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
  permitUnusedDeclared library.java.everit_json_schema
  provided library.java.everit_json_schema
  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
  testImplementation project(":sdks:java:io:synthetic")
  testImplementation project(":sdks:java:managed")
  testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
  testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
  testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
  testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
  // For testing Cross-language transforms
  testImplementation library.java.avro
  testImplementation library.java.junit
  testImplementation library.java.powermock
  testImplementation library.java.powermock_mockito
  testImplementation library.java.testcontainers_kafka
  testRuntimeOnly library.java.slf4j_jdk14
  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
  kafkaVersions.each {"kafkaVersion$it.key" "org.apache.kafka:kafka-clients:$it.value"}
}

kafkaVersions.each { kv ->
  configurations."kafkaVersion$kv.key" {
    resolutionStrategy {
      force "org.apache.kafka:kafka-clients:$kv.value"
    }
  }
}

kafkaVersions.each {kv ->
  task "kafkaVersion${kv.key}Test"(type: Test) {
    group = "Verification"
    description = "Runs KafkaIO tests with Kafka clients API $kv.value"
    outputs.upToDateWhen { false }
    testClassesDirs = sourceSets.test.output.classesDirs
    classpath =  configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath
    include '**/KafkaIOTest.class'
  }
}

//Because this runs many integration jobs in parallel, each of which use a
//container, it can fail locally due to performance limitations on a desktop.
//To avoid this, use --max-workers=N, where N is less than half your CPUs.
//4 is a good start for parallelism without overloading your computer.
task kafkaVersionsCompatibilityTest {
  group = "Verification"
  description = 'Runs KafkaIO with different Kafka client APIs'
  def testNames = createTestList(kafkaVersions, "Test")
  dependsOn testNames
  dependsOn (":sdks:java:io:kafka:kafka-01103:kafkaVersion01103BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-100:kafkaVersion100BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-111:kafkaVersion111BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-201:kafkaVersion201BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-211:kafkaVersion211BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-222:kafkaVersion222BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-231:kafkaVersion231BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-241:kafkaVersion241BatchIT")
  dependsOn (":sdks:java:io:kafka:kafka-251:kafkaVersion251BatchIT")
}

static def createTestList(Map<String, String> prefixMap, String suffix) {
  return prefixMap.keySet().stream()
      .map{version -> "kafkaVersion${version}${suffix}"}
      .collect(Collectors.toList())
}
