blob: b6102ef92cbf098e6107e54c2b9b23f621aa59ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tests.util.kafka;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.apache.kafka.common.serialization.VoidSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
/** smoke test for the kafka connectors. */
@ExtendWith({TestLoggerExtension.class})
@Testcontainers
class SmokeKafkaITCase {
private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";
@Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
TestcontainersSettings.builder().logger(LOG).dependsOn(KAFKA_CONTAINER).build();
@RegisterExtension
public static final FlinkContainers FLINK =
FlinkContainers.builder()
.withFlinkContainersSettings(
FlinkContainersSettings.basedOn(getConfiguration()))
.withTestcontainersSettings(TESTCONTAINERS_SETTINGS)
.build();
private static AdminClient admin;
private static KafkaProducer<Void, Integer> producer;
private static Configuration getConfiguration() {
// modify configuration to have enough slots
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
return flinkConfig;
}
@BeforeAll
static void setUp() {
final Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
admin = AdminClient.create(adminProperties);
final Properties producerProperties = new Properties();
producerProperties.putAll(adminProperties);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class);
producerProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producer = new KafkaProducer<>(producerProperties);
}
@AfterAll
static void teardown() {
admin.close();
producer.close();
}
@Test
public void testKafka() throws Exception {
final Path kafkaExampleJar = ResourceTestUtils.getResource(EXAMPLE_JAR_MATCHER);
final String inputTopic = "test-input-" + "-" + UUID.randomUUID();
final String outputTopic = "test-output" + "-" + UUID.randomUUID();
// create the required topics
final short replicationFactor = 1;
admin.createTopics(
Arrays.asList(
new NewTopic(inputTopic, 1, replicationFactor),
new NewTopic(outputTopic, 1, replicationFactor)))
.all()
.get();
producer.send(new ProducerRecord<>(inputTopic, 1));
producer.send(new ProducerRecord<>(inputTopic, 2));
producer.send(new ProducerRecord<>(inputTopic, 3));
// run the Flink job
FLINK.submitJob(
new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
.setDetached(false)
.addArgument("--input-topic", inputTopic)
.addArgument("--output-topic", outputTopic)
.addArgument("--prefix", "PREFIX")
.addArgument(
"--bootstrap.servers",
String.join(
",",
KAFKA_CONTAINER.getBootstrapServers(),
KAFKA_CONTAINER.getNetworkAliases().stream()
.map(
host ->
String.join(
":",
host,
Integer.toString(9092)))
.collect(Collectors.joining(","))))
.addArgument("--group.id", "myconsumer")
.addArgument("--auto.offset.reset", "earliest")
.addArgument("--transaction.timeout.ms", "900000")
.addArgument("--flink.partition-discovery.interval-millis", "1000")
.build());
final Properties consumerProperties = new Properties();
consumerProperties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class);
consumerProperties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
List<Integer> records =
KafkaUtil.drainAllRecordsFromTopic(outputTopic, consumerProperties).stream()
.map(r -> ByteBuffer.wrap(r.value()).getInt())
.collect(Collectors.toList());
assertThat(records).hasSize(3).containsExactly(1, 2, 3);
}
}