blob: 90771467974af9e572e1c360c6d582053eae1a1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.apache.pulsar.tests.integration.topologies.PulsarTestBase.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
/**
* A tester for testing kafka sink.
*/
@Slf4j
public class KafkaSinkTester extends SinkTester<KafkaContainer> {
private final String kafkaTopicName;
private KafkaConsumer<String, String> kafkaConsumer;
private final String containerName;
public KafkaSinkTester(String containerName) {
super(containerName, SinkType.KAFKA);
this.containerName = containerName;
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_sink_topic_" + suffix;
sinkConfig.put("bootstrapServers", networkAlias + ":9092");
sinkConfig.put("acks", "all");
sinkConfig.put("batchSize", 1L);
sinkConfig.put("maxRequestSize", 1048576L);
sinkConfig.put("topic", kafkaTopicName);
}
@Override
protected KafkaContainer createSinkService(PulsarCluster cluster) {
return new KafkaContainer()
.withEmbeddedZookeeper()
.withNetworkAliases(containerName)
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
.withName(containerName)
.withHostName(cluster.getClusterName() + "-" + containerName));
}
@Override
public void prepareSink() throws Exception {
ExecResult execResult = serviceContainer.execInContainer(
"/usr/bin/kafka-topics",
"--create",
"--zookeeper",
"localhost:2181",
"--partitions",
"1",
"--replication-factor",
"1",
"--topic",
kafkaTopicName);
assertTrue(
execResult.getStdout().contains("Created topic"),
execResult.getStdout());
kafkaConsumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
}
@Override
public void validateSinkResult(Map<String, String> kvs) {
Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
while (kvIter.hasNext()) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
log.info("Received {} records from kafka topic {}",
records.count(), kafkaTopicName);
if (records.isEmpty()) {
continue;
}
Iterator<ConsumerRecord<String, String>> recordsIter = records.iterator();
while (recordsIter.hasNext() && kvIter.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordsIter.next();
Map.Entry<String, String> expectedRecord = kvIter.next();
assertEquals(expectedRecord.getKey(), consumerRecord.key());
assertEquals(expectedRecord.getValue(), consumerRecord.value());
}
}
}
}