blob: 47891aa473671d0a9e4f3de76704c246a3c116be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Unite test class for {@link KafkaSource}. */
public class KafkaSourceITCase {
private static final String TOPIC1 = "topic1";
private static final String TOPIC2 = "topic2";
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class KafkaSpecificTests {
@BeforeAll
public void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(
TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp);
KafkaSourceTestEnv.setupTopic(
TOPIC2, true, true, KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp);
}
@AfterAll
public void tearDown() throws Exception {
KafkaSourceTestEnv.tearDown();
}
@Test
public void testTimestamp() throws Throwable {
final String topic = "testTimestamp";
final long currentTimestamp = System.currentTimeMillis();
KafkaSourceTestEnv.createTestTopic(topic, 1, 1);
KafkaSourceTestEnv.produceToKafka(
Arrays.asList(
new ProducerRecord<>(topic, 0, currentTimestamp + 1L, "key0", 0),
new ProducerRecord<>(topic, 0, currentTimestamp + 2L, "key1", 1),
new ProducerRecord<>(topic, 0, currentTimestamp + 3L, "key2", 2)));
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testTimestampAndWatermark")
.setTopics(topic)
.setDeserializer(new TestingKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<PartitionAndValue> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "testTimestamp");
// Verify that the timestamp and watermark are working fine.
stream.transform(
"timestampVerifier",
TypeInformation.of(PartitionAndValue.class),
new WatermarkVerifyingOperator(v -> v));
stream.addSink(new DiscardingSink<>());
JobExecutionResult result = env.execute();
assertEquals(
Arrays.asList(
currentTimestamp + 1L, currentTimestamp + 2L, currentTimestamp + 3L),
result.getAccumulatorResult("timestamp"));
}
@Test
public void testBasicRead() throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testBasicRead")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(new TestingKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<PartitionAndValue> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "testBasicRead");
executeAndVerify(env, stream);
}
@Test
public void testValueOnlyDeserializer() throws Exception {
KafkaSource<Integer> source =
KafkaSource.<Integer>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testValueOnlyDeserializer")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final CloseableIterator<Integer> resultIterator =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"testValueOnlyDeserializer")
.executeAndCollect();
AtomicInteger actualSum = new AtomicInteger();
resultIterator.forEachRemaining(actualSum::addAndGet);
// Calculate the actual sum of values
// Values in a partition should start from partition ID, and end with
// (NUM_RECORDS_PER_PARTITION - 1)
// e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
int expectedSum = 0;
for (int partition = 0; partition < KafkaSourceTestEnv.NUM_PARTITIONS; partition++) {
for (int value = partition;
value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
value++) {
expectedSum += value;
}
}
// Since we have two topics, the expected sum value should be doubled
expectedSum *= 2;
assertEquals(expectedSum, actualSum.get());
}
@Test
public void testRedundantParallelism() throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testRedundantParallelism")
.setTopics(Collections.singletonList(TOPIC1))
.setDeserializer(new TestingKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Here we use (NUM_PARTITION + 1) as the parallelism, so one SourceReader will not be
// assigned with any splits. The redundant SourceReader should also be signaled with a
// NoMoreSplitsEvent and eventually spins to FINISHED state.
env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1);
DataStream<PartitionAndValue> stream =
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism");
executeAndVerify(env, stream);
}
@Test
public void testBasicReadWithoutGroupId() throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(new TestingKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<PartitionAndValue> stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"testBasicReadWithoutGroupId");
executeAndVerify(env, stream);
}
}
/** Integration test based on connector testing framework. */
@Nested
class IntegrationTests extends SourceTestSuiteBase<String> {
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:5.5.2";
// Defines test environment on Flink MiniCluster
@SuppressWarnings("unused")
@TestEnv
MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
// Defines external system
@ExternalSystem
DefaultContainerizedExternalSystem<KafkaContainer> kafka =
DefaultContainerizedExternalSystem.builder()
.fromContainer(new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)))
.build();
// Defines 2 External context Factories, so test cases will be invoked twice using these two
// kinds of external contexts.
@SuppressWarnings("unused")
@ExternalContextFactory
KafkaSingleTopicExternalContext.Factory singleTopic =
new KafkaSingleTopicExternalContext.Factory(kafka.getContainer());
@SuppressWarnings("unused")
@ExternalContextFactory
KafkaMultipleTopicExternalContext.Factory multipleTopic =
new KafkaMultipleTopicExternalContext.Factory(kafka.getContainer());
}
// -----------------
private static class PartitionAndValue implements Serializable {
private static final long serialVersionUID = 4813439951036021779L;
private final String tp;
private final int value;
private PartitionAndValue(TopicPartition tp, int value) {
this.tp = tp.toString();
this.value = value;
}
}
private static class TestingKafkaRecordDeserializationSchema
implements KafkaRecordDeserializationSchema<PartitionAndValue> {
private static final long serialVersionUID = -3765473065594331694L;
private transient Deserializer<Integer> deserializer;
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> record, Collector<PartitionAndValue> collector)
throws IOException {
if (deserializer == null) {
deserializer = new IntegerDeserializer();
}
collector.collect(
new PartitionAndValue(
new TopicPartition(record.topic(), record.partition()),
deserializer.deserialize(record.topic(), record.value())));
}
@Override
public TypeInformation<PartitionAndValue> getProducedType() {
return TypeInformation.of(PartitionAndValue.class);
}
}
private static class WatermarkVerifyingOperator
extends StreamMap<PartitionAndValue, PartitionAndValue> {
public WatermarkVerifyingOperator(
MapFunction<PartitionAndValue, PartitionAndValue> mapper) {
super(mapper);
}
private static final long serialVersionUID = 2868223355944228209L;
@Override
public void open() throws Exception {
getRuntimeContext().addAccumulator("timestamp", new ListAccumulator<Long>());
}
@Override
public void processElement(StreamRecord<PartitionAndValue> element) {
getRuntimeContext().getAccumulator("timestamp").add(element.getTimestamp());
}
}
private void executeAndVerify(
StreamExecutionEnvironment env, DataStream<PartitionAndValue> stream) throws Exception {
stream.addSink(
new RichSinkFunction<PartitionAndValue>() {
@Override
public void open(Configuration parameters) {
getRuntimeContext()
.addAccumulator("result", new ListAccumulator<PartitionAndValue>());
}
@Override
public void invoke(PartitionAndValue value, Context context) {
getRuntimeContext().getAccumulator("result").add(value);
}
});
List<PartitionAndValue> result = env.execute().getAccumulatorResult("result");
Map<String, List<Integer>> resultPerPartition = new HashMap<>();
result.forEach(
partitionAndValue ->
resultPerPartition
.computeIfAbsent(partitionAndValue.tp, ignored -> new ArrayList<>())
.add(partitionAndValue.value));
resultPerPartition.forEach(
(tp, values) -> {
int firstExpectedValue = Integer.parseInt(tp.substring(tp.indexOf('-') + 1));
for (int i = 0; i < values.size(); i++) {
assertEquals(
firstExpectedValue + i,
(int) values.get(i),
String.format(
"The %d-th value for partition %s should be %d", i, tp, i));
}
});
}
}