blob: 9a9acdea06416dad9100d0bfb495c97bbbcbc2aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.KafkaUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Before;
import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
/** Abstract test base for all Kafka consumer tests. */
@SuppressWarnings("serial")
public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
protected final boolean useNewSource;
private ClusterClient<?> client;
protected KafkaConsumerTestBase() {
this(false);
}
protected KafkaConsumerTestBase(boolean useNewSource) {
this.useNewSource = useNewSource;
}
// ------------------------------------------------------------------------
// Common Test Preparation
// ------------------------------------------------------------------------
/**
* Makes sure that no job is on the JobManager any more from any previous tests that use the
* same mini cluster. Otherwise, missing slots may happen.
*/
@Before
public void setClientAndEnsureNoJobIsLingering() throws Exception {
client = flink.getClusterClient();
waitUntilNoJobIsRunning(client);
}
// ------------------------------------------------------------------------
// Suite of Tests
//
// The tests here are all not activated (by an @Test tag), but need
// to be invoked from the extending classes. That way, the classes can
// select which tests to run.
// ------------------------------------------------------------------------
/**
* Test that ensures the KafkaConsumer is properly failing if the topic doesn't exist and a
* wrong broker was specified.
*
* @throws Exception
*/
public void runFailOnNoBrokerTest() throws Exception {
try {
Properties properties = new Properties();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setRestartStrategy(RestartStrategies.noRestart());
see.setParallelism(1);
// use wrong ports for the consumers
properties.setProperty("bootstrap.servers", "localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast
properties.setProperty("socket.timeout.ms", "3000");
properties.setProperty("session.timeout.ms", "2000");
properties.setProperty("fetch.max.wait.ms", "2000");
properties.setProperty("heartbeat.interval.ms", "1000");
properties.putAll(secureProps);
DataStream<String> stream =
getStream(see, "doesntexist", new SimpleStringSchema(), properties);
stream.print();
see.execute("No broker test");
} catch (JobExecutionException jee) {
final Optional<TimeoutException> optionalTimeoutException =
ExceptionUtils.findThrowable(jee, TimeoutException.class);
assertThat(optionalTimeoutException).isPresent();
final TimeoutException timeoutException = optionalTimeoutException.get();
if (useNewSource) {
assertThat(timeoutException)
.hasMessageContaining("Timed out waiting for a node assignment.");
} else {
assertThat(timeoutException)
.hasMessage("Timeout expired while fetching topic metadata");
}
}
}
/**
* Ensures that the committed offsets to Kafka are the offsets of "the next record to process".
*/
public void runCommitOffsetsToKafka() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition
// should be 50)
final int parallelism = 3;
final int recordsInEachPartition = 50;
final String topicName =
writeSequence(
"testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(parallelism);
env.enableCheckpointing(200);
DataStream<String> stream =
getStream(env, topicName, new SimpleStringSchema(), standardProps);
stream.addSink(new DiscardingSink<String>());
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final Thread runner =
new Thread("runner") {
@Override
public void run() {
try {
env.execute();
} catch (Throwable t) {
if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
}
};
runner.start();
final Long l50 = 50L; // the final committed offset in Kafka should be 50
final long deadline = 30_000_000_000L + System.nanoTime();
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
do {
Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
break;
}
Thread.sleep(100);
} while (System.nanoTime() < deadline);
// cancel the job & wait for the job to finish
final Iterator<JobID> it = getRunningJobs(client).iterator();
final JobID jobId = it.next();
client.cancel(jobId).get();
assertThat(it.hasNext()).isFalse();
runner.join();
final Throwable t = errorRef.get();
if (t != null) {
throw new RuntimeException("Job failed with an exception", t);
}
// final check to see if offsets are correctly in Kafka
Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
assertThat(o1).isEqualTo(Long.valueOf(50L));
assertThat(o2).isEqualTo(Long.valueOf(50L));
assertThat(o3).isEqualTo(Long.valueOf(50L));
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
/**
* This test ensures that when the consumers retrieve some start offset from kafka (earliest,
* latest), that this offset is committed to Kafka, even if some partitions are not read.
*
* <p>Test: - Create 3 partitions - write 50 messages into each. - Start three consumers with
* auto.offset.reset='latest' and wait until they committed into Kafka. - Check if the offsets
* in Kafka are set to 50 for the three partitions
*
* <p>See FLINK-3440 as well
*/
public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition
// should be 50)
final int parallelism = 3;
final int recordsInEachPartition = 50;
final String topicName =
writeSequence(
"testAutoOffsetRetrievalAndCommitToKafkaTopic",
recordsInEachPartition,
parallelism,
1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(parallelism);
env.enableCheckpointing(200);
Properties readProps = new Properties();
readProps.putAll(standardProps);
readProps.setProperty(
"auto.offset.reset",
"latest"); // set to reset to latest, so that partitions are initially not read
DataStream<String> stream = getStream(env, topicName, new SimpleStringSchema(), readProps);
stream.addSink(new DiscardingSink<String>());
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final Thread runner =
new Thread("runner") {
@Override
public void run() {
try {
env.execute();
} catch (Throwable t) {
if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
}
};
runner.start();
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
final Long l50 = 50L; // the final committed offset in Kafka should be 50
final long deadline = 30_000_000_000L + System.nanoTime();
do {
Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
break;
}
Thread.sleep(100);
} while (System.nanoTime() < deadline);
// cancel the job & wait for the job to finish
final Iterator<JobID> it = getRunningJobs(client).iterator();
final JobID jobId = it.next();
client.cancel(jobId).get();
assertThat(it.hasNext()).isFalse();
runner.join();
final Throwable t = errorRef.get();
if (t != null) {
throw new RuntimeException("Job failed with an exception", t);
}
// final check to see if offsets are correctly in Kafka
Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
assertThat(o1).isEqualTo(Long.valueOf(50L));
assertThat(o2).isEqualTo(Long.valueOf(50L));
assertThat(o3).isEqualTo(Long.valueOf(50L));
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
/**
* This test ensures that when explicitly set to start from earliest record, the consumer
* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
*/
public void runStartFromEarliestOffsets() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition
// should be 50)
final int parallelism = 3;
final int recordsInEachPartition = 50;
final String topicName =
writeSequence(
"testStartFromEarliestOffsetsTopic",
recordsInEachPartition,
parallelism,
1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Properties readProps = new Properties();
readProps.putAll(standardProps);
readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
// the committed offsets should be ignored
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
readSequence(
env,
StartupMode.EARLIEST,
null,
null,
readProps,
parallelism,
topicName,
recordsInEachPartition,
0);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
/**
* This test ensures that when explicitly set to start from latest record, the consumer ignores
* the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
*/
public void runStartFromLatestOffsets() throws Exception {
// 50 records written to each of 3 partitions before launching a latest-starting consuming
// job
final int parallelism = 3;
final int recordsInEachPartition = 50;
// each partition will be written an extra 200 records
final int extraRecordsInEachPartition = 200;
// all already existing data in the topic, before the consuming topology has started, should
// be ignored
final String topicName =
writeSequence(
"testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
// the committed offsets should be ignored
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
// job names for the topologies for writing and consuming the extra records
final String consumeExtraRecordsJobName = "Consume Extra Records Job";
final String writeExtraRecordsJobName = "Write Extra Records Job";
// serialization / deserialization schemas for writing and consuming the extra records
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(
resultType, new ExecutionConfig()));
// setup and run the latest-consuming job
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
final Properties readProps = new Properties();
readProps.putAll(standardProps);
readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
DataStreamSource<Tuple2<Integer, Integer>> stream;
if (useNewSource) {
KafkaSource<Tuple2<Integer, Integer>> source =
kafkaServer
.getSourceBuilder(topicName, deserSchema, readProps)
.setStartingOffsets(OffsetsInitializer.latest())
.build();
stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource");
} else {
FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer =
kafkaServer.getConsumer(topicName, deserSchema, readProps);
latestReadingConsumer.setStartFromLatest();
stream = env.addSource(latestReadingConsumer);
}
stream.setParallelism(parallelism)
.flatMap(
new FlatMapFunction<Tuple2<Integer, Integer>, Object>() {
@Override
public void flatMap(
Tuple2<Integer, Integer> value, Collector<Object> out)
throws Exception {
if (value.f1 - recordsInEachPartition < 0) {
throw new RuntimeException(
"test failed; consumed a record that was previously written: "
+ value);
}
}
})
.setParallelism(1)
.addSink(new DiscardingSink<>());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
final JobID consumeJobId = jobGraph.getJobID();
final AtomicReference<Throwable> error = new AtomicReference<>();
Thread consumeThread =
new Thread(
() -> {
try {
submitJobAndWaitForResult(
client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, JobCancellationException.class)
.isPresent()) {
error.set(t);
}
}
});
consumeThread.start();
// wait until the consuming job has started, to be extra safe
waitUntilJobIsRunning(client);
// setup the extra records writing job
final StreamExecutionEnvironment env2 =
StreamExecutionEnvironment.getExecutionEnvironment();
env2.setParallelism(parallelism);
DataStream<Tuple2<Integer, Integer>> extraRecordsStream =
env2.addSource(
new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx)
throws Exception {
int count =
recordsInEachPartition; // the extra records should start
// from the last written value
int partition = getRuntimeContext().getIndexOfThisSubtask();
while (running
&& count
< recordsInEachPartition
+ extraRecordsInEachPartition) {
ctx.collect(new Tuple2<>(partition, count));
count++;
}
}
@Override
public void cancel() {
running = false;
}
});
kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null);
try {
env2.execute(writeExtraRecordsJobName);
} catch (Exception e) {
throw new RuntimeException("Writing extra records failed", e);
}
// cancel the consume job after all extra records are written
client.cancel(consumeJobId).get();
consumeThread.join();
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
// check whether the consuming thread threw any test errors;
// test will fail here if the consume job had incorrectly read any records other than the
// extra records
final Throwable consumerError = error.get();
if (consumerError != null) {
throw new Exception("Exception in the consuming thread", consumerError);
}
}
/**
* This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to
* "auto.offset.reset" behaviour when necessary, when explicitly configured to start from group
* offsets.
*
* <p>The partitions and their committed group offsets are setup as: partition 0 --> committed
* offset 23 partition 1 --> no commit offset partition 2 --> committed offset 43
*
* <p>When configured to start from group offsets, each partition should read: partition 0 -->
* start from offset 23, read to offset 49 (27 records) partition 1 --> default to
* "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records)
* partition 2 --> start from offset 43, read to offset 49 (7 records)
*/
public void runStartFromGroupOffsets() throws Exception {
// 3 partitions with 50 records each (offsets 0-49)
final int parallelism = 3;
final int recordsInEachPartition = 50;
final String topicName =
writeSequence(
"testStartFromGroupOffsetsTopic", recordsInEachPartition, parallelism, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Properties readProps = new Properties();
readProps.putAll(standardProps);
readProps.setProperty("auto.offset.reset", "earliest");
// the committed group offsets should be used as starting points
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
// only partitions 0 and 2 have group offsets committed
kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets =
new HashMap<>();
partitionsToValueCountAndStartOffsets.put(
0, new Tuple2<>(27, 23)); // partition 0 should read offset 23-49
partitionsToValueCountAndStartOffsets.put(
1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49
partitionsToValueCountAndStartOffsets.put(
2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49
readSequence(
env,
StartupMode.GROUP_OFFSETS,
null,
null,
readProps,
topicName,
partitionsToValueCountAndStartOffsets);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
/**
* This test ensures that the consumer correctly uses user-supplied specific offsets when
* explicitly configured to start from specific offsets. For partitions which a specific offset
* can not be found for, the starting position for them should fallback to the group offsets
* behaviour.
*
* <p>4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map
* is: partition 0 --> start from offset 19 partition 1 --> not set partition 2 --> start from
* offset 22 partition 3 --> not set partition 4 --> start from offset 26 (this should be
* ignored because the partition does not exist)
*
* <p>The partitions and their committed group offsets are setup as: partition 0 --> committed
* offset 23 partition 1 --> committed offset 31 partition 2 --> committed offset 43 partition 3
* --> no commit offset
*
* <p>When configured to start from these specific offsets, each partition should read:
* partition 0 --> start from offset 19, read to offset 49 (31 records) partition 1 --> fallback
* to group offsets, so start from offset 31, read to offset 49 (19 records) partition 2 -->
* start from offset 22, read to offset 49 (28 records) partition 3 --> fallback to group
* offsets, but since there is no group offset for this partition, will default to
* "auto.offset.reset" (set to "earliest"), so start from offset 0, read to offset 49 (50
* records)
*/
public void runStartFromSpecificOffsets() throws Exception {
// 4 partitions with 50 records each (offsets 0-49)
final int parallelism = 4;
final int recordsInEachPartition = 50;
final String topicName =
writeSequence(
"testStartFromSpecificOffsetsTopic",
recordsInEachPartition,
parallelism,
1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Properties readProps = new Properties();
readProps.putAll(standardProps);
readProps.setProperty(
"auto.offset.reset",
"earliest"); // partition 3 should default back to this behaviour
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L);
specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L);
specificStartupOffsets.put(
new KafkaTopicPartition(topicName, 4),
26L); // non-existing partition, should be ignored
// only the committed offset for partition 1 should be used, because partition 1 has no
// entry in specific offset map
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets =
new HashMap<>();
partitionsToValueCountAndStartOffsets.put(
0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49
partitionsToValueCountAndStartOffsets.put(
1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49
partitionsToValueCountAndStartOffsets.put(
2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49
partitionsToValueCountAndStartOffsets.put(
3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49
readSequence(
env,
StartupMode.SPECIFIC_OFFSETS,
specificStartupOffsets,
null,
readProps,
topicName,
partitionsToValueCountAndStartOffsets);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
/**
* This test ensures that the consumer correctly uses user-supplied timestamp when explicitly
* configured to start from timestamp.
*
* <p>The validated Kafka data is written in 2 steps: first, an initial 50 records is written to
* each partition. After that, another 30 records is appended to each partition. Before each
* step, a timestamp is recorded. For the validation, when the read job is configured to start
* from the first timestamp, each partition should start from offset 0 and read a total of 80
* records. When configured to start from the second timestamp, each partition should start from
* offset 50 and read on the remaining 30 appended records.
*/
public void runStartFromTimestamp() throws Exception {
// 4 partitions with 50 records each
final int parallelism = 4;
final int initialRecordsInEachPartition = 50;
final int appendRecordsInEachPartition = 30;
// attempt to create an appended test sequence, where the timestamp of writing the appended
// sequence
// is assured to be larger than the timestamp of the original sequence.
long firstTimestamp = System.currentTimeMillis();
String topic =
writeSequence(
"runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1);
long secondTimestamp = 0;
while (secondTimestamp <= firstTimestamp) {
Thread.sleep(1000);
secondTimestamp = System.currentTimeMillis();
}
writeAppendSequence(
topic, initialRecordsInEachPartition, appendRecordsInEachPartition, parallelism);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Properties readProps = new Properties();
readProps.putAll(standardProps);
readSequence(
env,
StartupMode.TIMESTAMP,
null,
firstTimestamp,
readProps,
parallelism,
topic,
initialRecordsInEachPartition + appendRecordsInEachPartition,
0);
readSequence(
env,
StartupMode.TIMESTAMP,
null,
secondTimestamp,
readProps,
parallelism,
topic,
appendRecordsInEachPartition,
initialRecordsInEachPartition);
deleteTestTopic(topic);
}
/**
* Ensure Kafka is working on both producer and consumer side. This executes a job that contains
* two Flink pipelines.
*
* <pre>
* (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
* </pre>
*
* <p>We need to externally retry this test. We cannot let Flink's retry mechanism do it,
* because the Kafka producer does not guarantee exactly-once output. Hence a recovery would
* introduce duplicates that cause the test to fail.
*
* <p>This test also ensures that FLINK-3156 doesn't happen again:
*
* <p>The following situation caused a NPE in the FlinkKafkaConsumer
*
* <p>topic-1 <-- elements are only produced into topic1. topic-2
*
* <p>Therefore, this test is consuming as well from an empty topic.
*/
@RetryOnException(times = 2, exception = NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID();
final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID();
final int parallelism = 3;
final int elementsPerPartition = 100;
final int totalElements = parallelism * elementsPerPartition;
createTestTopic(topic, parallelism, 1);
createTestTopic(
additionalEmptyTopic,
parallelism,
1); // create an empty topic which will remain empty all the time
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
TypeInformation<Tuple2<Long, String>> longStringType =
TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {});
TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
// ----------- add producer dataflow ----------
DataStream<Tuple2<Long, String>> stream =
env.addSource(
new RichParallelSourceFunction<Tuple2<Long, String>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx)
throws InterruptedException {
int cnt =
getRuntimeContext().getIndexOfThisSubtask()
* elementsPerPartition;
int limit = cnt + elementsPerPartition;
while (running && cnt < limit) {
ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
cnt++;
// we delay data generation a bit so that we are sure that some
// checkpoints are
// triggered (for FLINK-3156)
Thread.sleep(50);
}
}
@Override
public void cancel() {
running = false;
}
});
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
producerProperties.putAll(secureProps);
kafkaServer.produceIntoKafka(stream, topic, sinkSchema, producerProperties, null);
// ----------- add consumer dataflow ----------
List<String> topics = new ArrayList<>();
topics.add(topic);
topics.add(additionalEmptyTopic);
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStreamSource<Tuple2<Long, String>> consuming =
getStream(env, topics, sourceSchema, props);
consuming
.addSink(
new RichSinkFunction<Tuple2<Long, String>>() {
private int elCnt = 0;
private BitSet validator = new BitSet(totalElements);
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
assertThat((long) v).isEqualTo(value.f0 - 1000);
assertThat(validator.get(v)).as("Received tuple twice").isFalse();
validator.set(v);
elCnt++;
if (elCnt == totalElements) {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) != totalElements) {
fail(
"The bitset was not set to 1 on all elements. Next clear:"
+ nc
+ " Set: "
+ validator);
}
throw new SuccessException();
}
}
@Override
public void close() throws Exception {
super.close();
}
})
.setParallelism(1);
try {
tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
} catch (ProgramInvocationException | JobExecutionException e) {
// look for NotLeaderForPartitionException
Throwable cause = e.getCause();
// search for nested SuccessExceptions
int depth = 0;
while (cause != null && depth++ < 20) {
if (cause instanceof NotLeaderForPartitionException) {
throw (Exception) cause;
}
cause = cause.getCause();
}
throw e;
}
deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
* Flink sources.
*/
public void runOneToOneExactlyOnceTest() throws Exception {
final String topic = "oneToOneTopic-" + UUID.randomUUID();
final int parallelism = 5;
final int numElementsPerPartition = 1000;
final int totalElements = parallelism * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;
createTestTopic(topic, parallelism, 1);
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
topic,
parallelism,
numElementsPerPartition,
true);
// run the topology that fails and recovers
DeserializationSchema<Integer> schema =
new TypeInformationSerializationSchema<>(
BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements))
.setParallelism(1);
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "One-to-one exactly once test");
deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having fewer Flink sources than Kafka partitions, so one
* Flink source will read multiple Kafka partitions.
*/
public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
final String topic = "oneToManyTopic-" + UUID.randomUUID();
final int numPartitions = 5;
final int numElementsPerPartition = 1000;
final int totalElements = numPartitions * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;
final int parallelism = 2;
createTestTopic(topic, numPartitions, 1);
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
topic,
numPartitions,
numElementsPerPartition,
true);
// run the topology that fails and recovers
DeserializationSchema<Integer> schema =
new TypeInformationSerializationSchema<>(
BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(numPartitions, 3))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements))
.setParallelism(1);
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "One-source-multi-partitions exactly once test");
deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having more Flink sources than Kafka partitions, which
* means that some Flink sources will read no partitions.
*/
public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
final String topic = "manyToOneTopic-" + UUID.randomUUID();
final int numPartitions = 5;
final int numElementsPerPartition = 1000;
final int totalElements = numPartitions * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;
final int parallelism = 8;
createTestTopic(topic, numPartitions, 1);
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
topic,
numPartitions,
numElementsPerPartition,
true);
// run the topology that fails and recovers
DeserializationSchema<Integer> schema =
new TypeInformationSerializationSchema<>(
BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(parallelism);
// set the number of restarts to one. The failing mapper will fail once, then it's only
// success exceptions.
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setBufferTimeout(0);
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(numPartitions, 1))
// Job only fails after a checkpoint is taken and the necessary number of elements
// is seen
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements, true))
.setParallelism(1);
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "multi-source-one-partitions exactly once test");
deleteTestTopic(topic);
}
/** Tests that the source can be properly canceled when reading full partitions. */
public void runCancelingOnFullInputTest() throws Exception {
final String topic = "cancelingOnFullTopic-" + UUID.randomUUID();
final int parallelism = 3;
createTestTopic(topic, parallelism, 1);
// launch a producer thread
DataGenerators.InfiniteStringsGenerator generator =
new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
generator.start();
// launch a consumer asynchronously
final AtomicReference<Throwable> jobError = new AtomicReference<>();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(100);
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
getStream(env, topic, new SimpleStringSchema(), props)
.addSink(new DiscardingSink<String>());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
final Runnable jobRunner =
() -> {
try {
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
jobError.set(t);
}
};
Thread runnerThread = new Thread(jobRunner, "program runner thread");
runnerThread.start();
// wait a bit before canceling
Thread.sleep(2000);
Throwable failueCause = jobError.get();
if (failueCause != null) {
failueCause.printStackTrace();
fail("Test failed prematurely with: " + failueCause.getMessage());
}
// cancel
client.cancel(jobId).get();
// wait for the program to be done and validate that we failed with the right exception
runnerThread.join();
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.CANCELED);
if (generator.isAlive()) {
generator.shutdown();
generator.join();
} else {
Throwable t = generator.getError();
if (t != null) {
t.printStackTrace();
fail("Generator failed: " + t.getMessage());
} else {
fail("Generator failed with no exception");
}
}
deleteTestTopic(topic);
}
/** Tests that the source can be properly canceled when reading empty partitions. */
public void runCancelingOnEmptyInputTest() throws Exception {
final String topic = "cancelingOnEmptyInputTopic-" + UUID.randomUUID();
final int parallelism = 3;
createTestTopic(topic, parallelism, 1);
final AtomicReference<Throwable> error = new AtomicReference<>();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(100);
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
getStream(env, topic, new SimpleStringSchema(), props)
.addSink(new DiscardingSink<String>());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
final Runnable jobRunner =
() -> {
try {
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
LOG.error("Job Runner failed with exception", t);
error.set(t);
}
};
Thread runnerThread = new Thread(jobRunner, "program runner thread");
runnerThread.start();
// wait a bit before canceling
Thread.sleep(2000);
Throwable failueCause = error.get();
if (failueCause != null) {
failueCause.printStackTrace();
fail("Test failed prematurely with: " + failueCause.getMessage());
}
// cancel
client.cancel(jobId).get();
// wait for the program to be done and validate that we failed with the right exception
runnerThread.join();
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.CANCELED);
deleteTestTopic(topic);
}
/**
* Test producing and consuming into multiple topics.
*
* @throws Exception
*/
public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exception {
final String topicNamePrefix =
"runProduceConsumeMultipleTopics-" + (useLegacySchema ? "legacy" : "");
final int numTopics = 5;
final int numElements = 20;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create topics with content
final List<String> topics = new ArrayList<>();
for (int i = 0; i < numTopics; i++) {
final String topic = topicNamePrefix + i + "-" + UUID.randomUUID();
topics.add(topic);
// create topic
createTestTopic(topic, i + 1 /*partitions*/, 1);
}
// before FLINK-6078 the RemoteExecutionEnvironment set the parallelism to 1 as well
env.setParallelism(1);
// run first job, producing into all topics
DataStream<Tuple3<Integer, Integer, String>> stream =
env.addSource(
new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {
@Override
public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) {
int partition = getRuntimeContext().getIndexOfThisSubtask();
for (int topicId = 0; topicId < numTopics; topicId++) {
for (int i = 0; i < numElements; i++) {
ctx.collect(
new Tuple3<>(partition, i, topics.get(topicId)));
}
}
}
@Override
public void cancel() {}
});
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
if (useLegacySchema) {
Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null);
} else {
TestDeserializer schema = new TestDeserializer(env.getConfig());
kafkaServer.produceIntoKafka(stream, "dummy", schema, props);
}
env.execute("Write to topics");
// run second job consuming from multiple topics
env = StreamExecutionEnvironment.getExecutionEnvironment();
if (useLegacySchema) {
Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
stream = getStream(env, topics, schema, props);
} else {
TestDeserializer schema = new TestDeserializer(env.getConfig());
stream = getStream(env, topics, schema, props);
}
stream.flatMap(
new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
Map<String, Integer> countPerTopic = new HashMap<>(numTopics);
@Override
public void flatMap(
Tuple3<Integer, Integer, String> value, Collector<Integer> out)
throws Exception {
Integer count = countPerTopic.get(value.f2);
if (count == null) {
count = 1;
} else {
count++;
}
countPerTopic.put(value.f2, count);
// check map:
for (Map.Entry<String, Integer> el : countPerTopic.entrySet()) {
if (el.getValue() < numElements) {
break; // not enough yet
}
if (el.getValue() > numElements) {
throw new RuntimeException(
"There is a failure in the test. I've read "
+ el.getValue()
+ " from topic "
+ el.getKey());
}
}
// we've seen messages from all topics
throw new SuccessException();
}
})
.setParallelism(1);
tryExecute(env, "Count elements from the topics");
// delete all topics again
for (String topic : topics) {
deleteTestTopic(topic);
}
}
/**
* Test Flink's Kafka integration also with very big records (30MB).
*
* <p>see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
*/
public void runBigRecordTestTopology() throws Exception {
final String topic = "bigRecordTestTopic-" + UUID.randomUUID();
final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
createTestTopic(topic, parallelism, 1);
final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo =
TypeInformation.of(new TypeHint<Tuple2<Long, byte[]>>() {});
final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(100);
env.setParallelism(parallelism);
// add consuming topology:
Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
consumerProps.setProperty(
"max.partition.fetch.bytes",
Integer.toString(1024 * 1024 * 14)); // for the new fetcher
consumerProps.setProperty("queued.max.message.chunks", "1");
consumerProps.putAll(secureProps);
DataStreamSource<Tuple2<Long, byte[]>> consuming =
getStream(env, topic, serSchema, consumerProps);
consuming.addSink(
new SinkFunction<Tuple2<Long, byte[]>>() {
private int elCnt = 0;
@Override
public void invoke(Tuple2<Long, byte[]> value) throws Exception {
elCnt++;
if (value.f0 == -1) {
// we should have seen 11 elements now.
if (elCnt == 11) {
throw new SuccessException();
} else {
throw new RuntimeException(
"There have been " + elCnt + " elements");
}
}
if (elCnt > 10) {
throw new RuntimeException("More than 10 elements seen: " + elCnt);
}
}
});
// add producing topology
Properties producerProps = new Properties();
producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
producerProps.setProperty("retries", "3");
producerProps.putAll(secureProps);
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
DataStream<Tuple2<Long, byte[]>> stream =
env.addSource(
new RichSourceFunction<Tuple2<Long, byte[]>>() {
private boolean running;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
}
@Override
public void run(SourceContext<Tuple2<Long, byte[]>> ctx)
throws Exception {
Random rnd = new Random();
long cnt = 0;
int sevenMb = 1024 * 1024 * 7;
while (running) {
byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)];
ctx.collect(new Tuple2<>(cnt++, wl));
Thread.sleep(100);
if (cnt == 10) {
// signal end
ctx.collect(new Tuple2<>(-1L, new byte[] {1}));
break;
}
}
}
@Override
public void cancel() {
running = false;
}
});
kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProps, null);
tryExecute(env, "big topology test");
deleteTestTopic(topic);
}
public void runBrokerFailureTest() throws Exception {
final String topic = "brokerFailureTestTopic";
// Start a temporary multi-broker cluster.
// This test case relies on stopping a broker and switching partition leader to another
// during the test, so single-broker cluster (kafkaServer) could not fulfill the
// requirement.
KafkaTestEnvironment multiBrokerCluster = constructKafkaTestEnvironment();
multiBrokerCluster.prepare(KafkaTestEnvironment.createConfig().setKafkaServersNumber(3));
final int parallelism = 2;
final int numElementsPerPartition = 1000;
final int totalElements = parallelism * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;
multiBrokerCluster.createTestTopic(topic, parallelism, 2);
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
multiBrokerCluster,
topic,
parallelism,
numElementsPerPartition,
true);
// find leader to shut down
int leaderId = multiBrokerCluster.getLeaderToShutDown(topic);
LOG.info("Leader to shutdown {}", leaderId);
// run the topology (the consumers must handle the failures)
DeserializationSchema<Integer> schema =
new TypeInformationSerializationSchema<>(
BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.noRestart());
Properties props = new Properties();
props.putAll(multiBrokerCluster.getStandardProperties());
props.putAll(multiBrokerCluster.getSecureProperties());
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new BrokerKillingMapper<>(multiBrokerCluster, leaderId, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements))
.setParallelism(1);
try {
BrokerKillingMapper.killedLeaderBefore = false;
tryExecute(env, "Broker failure once test");
} finally {
// Tear down the temporary cluster anyway
multiBrokerCluster.shutdown();
}
}
public void runKeyValueTest() throws Exception {
final String topic = "keyvaluetest-" + UUID.randomUUID();
createTestTopic(topic, 1, 1);
final int elementCount = 5000;
// ----------- Write some data into Kafka -------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.noRestart());
DataStream<Tuple2<Long, PojoValue>> kvStream =
env.addSource(
new SourceFunction<Tuple2<Long, PojoValue>>() {
@Override
public void run(SourceContext<Tuple2<Long, PojoValue>> ctx)
throws Exception {
Random rnd = new Random(1337);
for (long i = 0; i < elementCount; i++) {
PojoValue pojo = new PojoValue();
pojo.when = new Date(rnd.nextLong());
pojo.lon = rnd.nextLong();
pojo.lat = i;
// make every second key null to ensure proper "null"
// serialization
Long key = (i % 2 == 0) ? null : i;
ctx.collect(new Tuple2<>(key, pojo));
}
}
@Override
public void cancel() {}
});
KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema =
new TypeInformationKeyValueSerializationSchema<>(
Long.class, PojoValue.class, env.getConfig());
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
env.execute("Write KV to Kafka");
// ----------- Read the data again -------------------
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.noRestart());
KafkaDeserializationSchema<Tuple2<Long, PojoValue>> readSchema =
new TypeInformationKeyValueSerializationSchema<>(
Long.class, PojoValue.class, env.getConfig());
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStream<Tuple2<Long, PojoValue>> fromKafka = getStream(env, topic, readSchema, props);
fromKafka.flatMap(
new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>() {
long counter = 0;
@Override
public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out)
throws Exception {
// the elements should be in order.
assertThat(value.f1.lat)
.as("Wrong value " + value.f1.lat)
.isEqualTo(counter);
if (value.f1.lat % 2 == 0) {
assertThat(value.f0).as("key was not null").isNull();
} else {
assertThat(value.f0).as("Wrong value " + value.f0).isEqualTo(counter);
}
counter++;
if (counter == elementCount) {
// we got the right number of elements
throw new SuccessException();
}
}
});
tryExecute(env, "Read KV from Kafka");
deleteTestTopic(topic);
}
private static class PojoValue {
public Date when;
public long lon;
public long lat;
public PojoValue() {}
}
/**
* Test delete behavior and metrics for producer.
*
* @throws Exception
*/
public void runAllDeletesTest() throws Exception {
final String topic = "alldeletestest-" + UUID.randomUUID();
createTestTopic(topic, 1, 1);
final int elementCount = 300;
// ----------- Write some data into Kafka -------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
DataStream<Tuple2<byte[], PojoValue>> kvStream =
env.addSource(
new SourceFunction<Tuple2<byte[], PojoValue>>() {
@Override
public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx)
throws Exception {
Random rnd = new Random(1337);
for (long i = 0; i < elementCount; i++) {
final byte[] key = new byte[200];
rnd.nextBytes(key);
ctx.collect(new Tuple2<>(key, (PojoValue) null));
}
}
@Override
public void cancel() {}
});
TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema =
new TypeInformationKeyValueSerializationSchema<>(
byte[].class, PojoValue.class, env.getConfig());
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
producerProperties.putAll(secureProps);
kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
env.execute("Write deletes to Kafka");
// ----------- Read the data again -------------------
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStream<Tuple2<byte[], PojoValue>> fromKafka = getStream(env, topic, schema, props);
fromKafka.flatMap(
new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
long counter = 0;
@Override
public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out)
throws Exception {
// ensure that deleted messages are passed as nulls
assertThat(value.f1).isNull();
counter++;
if (counter == elementCount) {
// we got the right number of elements
throw new SuccessException();
}
}
});
tryExecute(env, "Read deletes from Kafka");
deleteTestTopic(topic);
}
/**
* Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated.
*
* @throws Exception
*/
public void runEndOfStreamTest() throws Exception {
final int elementCount = 300;
final String topic = writeSequence("testEndOfStream", elementCount, 1, 1);
// read using custom schema
final StreamExecutionEnvironment env1 =
StreamExecutionEnvironment.getExecutionEnvironment();
env1.setParallelism(1);
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStream<Tuple2<Integer, Integer>> fromKafka =
getStream(env1, topic, new FixedNumberDeserializationSchema(elementCount), props);
fromKafka.flatMap(
new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out)
throws Exception {
// noop ;)
}
});
tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
deleteTestTopic(topic);
}
/**
* Test that ensures that DeserializationSchema can emit multiple records via a Collector.
*
* @throws Exception
*/
public void runCollectingSchemaTest() throws Exception {
final int elementCount = 20;
final String topic = writeSequence("testCollectingSchema", elementCount, 1, 1);
// read using custom schema
final StreamExecutionEnvironment env1 =
StreamExecutionEnvironment.getExecutionEnvironment();
env1.setParallelism(1);
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStream<Tuple2<Integer, String>> fromKafka =
env1.addSource(
kafkaServer
.getConsumer(
topic,
new CollectingDeserializationSchema(elementCount),
props)
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Integer, String>>() {
@Override
public long extractAscendingTimestamp(
Tuple2<Integer, String> element) {
String string = element.f1;
return Long.parseLong(
string.substring(0, string.length() - 1));
}
}));
fromKafka
.keyBy(t -> t.f0)
.process(
new KeyedProcessFunction<Integer, Tuple2<Integer, String>, Void>() {
private boolean registered = false;
@Override
public void processElement(
Tuple2<Integer, String> value, Context ctx, Collector<Void> out)
throws Exception {
if (!registered) {
ctx.timerService().registerEventTimeTimer(elementCount - 2);
registered = true;
}
}
@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector<Void> out)
throws Exception {
throw new SuccessException();
}
});
tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
deleteTestTopic(topic);
}
/**
* Test metrics reporting for consumer.
*
* @throws Exception
*/
public void runMetricsTest() throws Throwable {
// create a stream with 5 topics
final String topic = "metricsStream-" + UUID.randomUUID();
createTestTopic(topic, 5, 1);
final Tuple1<Throwable> error = new Tuple1<>(null);
// start job writing & reading data.
final StreamExecutionEnvironment env1 =
StreamExecutionEnvironment.getExecutionEnvironment();
env1.setParallelism(1);
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env1.disableOperatorChaining(); // let the source read everything into the network buffers
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema =
new TypeInformationSerializationSchema<>(
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}),
env1.getConfig());
DataStream<Tuple2<Integer, Integer>> fromKafka =
getStream(env1, topic, schema, standardProps);
fromKafka.flatMap(
new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out)
throws Exception { // no op
}
});
DataStream<Tuple2<Integer, Integer>> fromGen =
env1.addSource(
new RichSourceFunction<Tuple2<Integer, Integer>>() {
boolean running = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx)
throws Exception {
int i = 0;
while (running) {
ctx.collect(
Tuple2.of(
i++,
getRuntimeContext().getIndexOfThisSubtask()));
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
});
kafkaServer.produceIntoKafka(fromGen, topic, schema, standardProps, null);
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
Thread jobThread =
new Thread(
() -> {
try {
submitJobAndWaitForResult(
client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, JobCancellationException.class)
.isPresent()) {
LOG.warn("Got exception during execution", t);
error.f0 = t;
}
}
});
jobThread.start();
try {
// connect to JMX
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// wait until we've found all 5 offset metrics
Set<ObjectName> offsetMetrics =
mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
while (offsetMetrics.size()
< 5) { // test will time out if metrics are not properly working
if (error.f0 != null) {
// fail test early
throw error.f0;
}
offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
Thread.sleep(50);
}
assertThat(offsetMetrics).hasSize(5);
// we can't rely on the consumer to have touched all the partitions already
// that's why we'll wait until all five partitions have a positive offset.
// The test will fail if we never meet the condition
while (true) {
int numPosOffsets = 0;
// check that offsets are correctly reported
for (ObjectName object : offsetMetrics) {
Object offset = mBeanServer.getAttribute(object, "Value");
if ((long) offset >= 0) {
numPosOffsets++;
}
}
if (numPosOffsets == 5) {
break;
}
// wait for the consumer to consume on all partitions
Thread.sleep(50);
}
// check if producer metrics are also available.
Set<ObjectName> producerMetrics =
mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
assertThat(producerMetrics.size()).as("No producer metrics found").isGreaterThan(30);
LOG.info("Found all JMX metrics. Cancelling job.");
} finally {
// cancel
client.cancel(jobId).get();
// wait for the job to finish (it should due to the cancel command above)
jobThread.join();
}
if (error.f0 != null) {
throw error.f0;
}
deleteTestTopic(topic);
}
private static class CollectingDeserializationSchema
implements KafkaDeserializationSchema<Tuple2<Integer, String>> {
final int finalCount;
TypeInformation<Tuple2<Integer, String>> ti =
TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {});
TypeSerializer<Tuple2<Integer, Integer>> ser =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
.createSerializer(new ExecutionConfig());
public CollectingDeserializationSchema(int finalCount) {
this.finalCount = finalCount;
}
@Override
public boolean isEndOfStream(Tuple2<Integer, String> nextElement) {
return false;
}
@Override
public Tuple2<Integer, String> deserialize(ConsumerRecord<byte[], byte[]> record)
throws Exception {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> message, Collector<Tuple2<Integer, String>> out)
throws Exception {
DataInputView in =
new DataInputViewStreamWrapper(new ByteArrayInputStream(message.value()));
Tuple2<Integer, Integer> tuple = ser.deserialize(in);
out.collect(Tuple2.of(tuple.f0, tuple.f1 + "a"));
out.collect(Tuple2.of(tuple.f0, tuple.f1 + "b"));
}
@Override
public TypeInformation<Tuple2<Integer, String>> getProducedType() {
return ti;
}
}
private static class FixedNumberDeserializationSchema
implements DeserializationSchema<Tuple2<Integer, Integer>> {
final int finalCount;
int count = 0;
TypeInformation<Tuple2<Integer, Integer>> ti =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
public FixedNumberDeserializationSchema(int finalCount) {
this.finalCount = finalCount;
}
@Override
public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
return ser.deserialize(in);
}
@Override
public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) {
return ++count >= finalCount;
}
@Override
public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
return ti;
}
}
// ------------------------------------------------------------------------
// Reading writing test data sets
// ------------------------------------------------------------------------
/**
* Runs a job using the provided environment to read a sequence of records from a single Kafka
* topic. The method allows to individually specify the expected starting offset and total read
* value count of each partition. The job will be considered successful only if all partition
* read results match the start offset and value count criteria.
*/
protected void readSequence(
final StreamExecutionEnvironment env,
final StartupMode startupMode,
final Map<KafkaTopicPartition, Long> specificStartupOffsets,
final Long startupTimestamp,
final Properties cc,
final String topicName,
final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset)
throws Exception {
final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size();
int finalCountTmp = 0;
for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset :
partitionsToValuesCountAndStartOffset.entrySet()) {
finalCountTmp += valuesCountAndStartOffset.getValue().f0;
}
final int finalCount = finalCountTmp;
final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
// create the consumer
cc.putAll(secureProps);
DataStreamSource<Tuple2<Integer, Integer>> source;
if (useNewSource) {
KafkaSourceBuilder<Tuple2<Integer, Integer>> sourceBuilder =
kafkaServer.getSourceBuilder(topicName, deser, cc);
Map<TopicPartition, Long> startOffsets = new HashMap<>();
if (specificStartupOffsets != null) {
specificStartupOffsets.forEach(
(ktp, offset) ->
startOffsets.put(
new TopicPartition(ktp.getTopic(), ktp.getPartition()),
offset));
}
setKafkaSourceOffset(startupMode, sourceBuilder, startOffsets, startupTimestamp);
source =
env.fromSource(
sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "KafkaSource");
} else {
FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer =
kafkaServer.getConsumer(topicName, deser, cc);
setKafkaConsumerOffset(startupMode, consumer, specificStartupOffsets, startupTimestamp);
source = env.addSource(consumer);
}
source.setParallelism(sourceParallelism)
.map(new ThrottledMapper<>(20))
.setParallelism(sourceParallelism)
.flatMap(
new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
private HashMap<Integer, BitSet> partitionsToValueCheck;
private int count = 0;
@Override
public void open(Configuration parameters) throws Exception {
partitionsToValueCheck = new HashMap<>();
for (Integer partition :
partitionsToValuesCountAndStartOffset.keySet()) {
partitionsToValueCheck.put(partition, new BitSet());
}
}
@Override
public void flatMap(
Tuple2<Integer, Integer> value, Collector<Integer> out)
throws Exception {
int partition = value.f0;
int val = value.f1;
BitSet bitSet = partitionsToValueCheck.get(partition);
if (bitSet == null) {
throw new RuntimeException(
"Got a record from an unknown partition");
} else {
bitSet.set(
val
- partitionsToValuesCountAndStartOffset.get(
partition)
.f1);
}
count++;
LOG.debug("Received message {}, total {} messages", value, count);
// verify if we've seen everything
if (count == finalCount) {
for (Map.Entry<Integer, BitSet> partitionsToValueCheck :
this.partitionsToValueCheck.entrySet()) {
BitSet check = partitionsToValueCheck.getValue();
int expectedValueCount =
partitionsToValuesCountAndStartOffset.get(
partitionsToValueCheck.getKey())
.f0;
if (check.cardinality() != expectedValueCount) {
throw new RuntimeException(
"Expected cardinality to be "
+ expectedValueCount
+ ", but was "
+ check.cardinality());
} else if (check.nextClearBit(0) != expectedValueCount) {
throw new RuntimeException(
"Expected next clear bit to be "
+ expectedValueCount
+ ", but was "
+ check.cardinality());
}
}
// test has passed
throw new SuccessException();
}
}
})
.setParallelism(1);
tryExecute(env, "Read data from Kafka");
LOG.info("Successfully read sequence for verification");
}
/**
* Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode,
* Map, Long, Properties, String, Map)} to expect reading from the same start offset and the
* same value count for all partitions of a single Kafka topic.
*/
protected void readSequence(
final StreamExecutionEnvironment env,
final StartupMode startupMode,
final Map<KafkaTopicPartition, Long> specificStartupOffsets,
final Long startupTimestamp,
final Properties cc,
final int sourceParallelism,
final String topicName,
final int valuesCount,
final int startFrom)
throws Exception {
HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset =
new HashMap<>();
for (int i = 0; i < sourceParallelism; i++) {
partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
}
readSequence(
env,
startupMode,
specificStartupOffsets,
startupTimestamp,
cc,
topicName,
partitionsToValuesCountAndStartOffset);
}
protected void setKafkaConsumerOffset(
final StartupMode startupMode,
final FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer,
final Map<KafkaTopicPartition, Long> specificStartupOffsets,
final Long startupTimestamp) {
switch (startupMode) {
case EARLIEST:
consumer.setStartFromEarliest();
break;
case LATEST:
consumer.setStartFromLatest();
break;
case SPECIFIC_OFFSETS:
consumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
case GROUP_OFFSETS:
consumer.setStartFromGroupOffsets();
break;
case TIMESTAMP:
consumer.setStartFromTimestamp(startupTimestamp);
break;
}
}
protected void setKafkaSourceOffset(
final StartupMode startupMode,
final KafkaSourceBuilder<?> kafkaSourceBuilder,
final Map<TopicPartition, Long> specificStartupOffsets,
final Long startupTimestamp) {
switch (startupMode) {
case EARLIEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break;
case LATEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case SPECIFIC_OFFSETS:
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.offsets(specificStartupOffsets));
break;
case GROUP_OFFSETS:
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
break;
case TIMESTAMP:
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.timestamp(startupTimestamp));
break;
}
}
protected String writeSequence(
String baseTopicName,
final int numElements,
final int parallelism,
final int replicationFactor)
throws Exception {
LOG.info(
"\n===================================\n"
+ "== Writing sequence of "
+ numElements
+ " into "
+ baseTopicName
+ " with p="
+ parallelism
+ "\n"
+ "===================================");
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(
resultType, new ExecutionConfig()));
final int maxNumAttempts = 10;
for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
final String topicName = baseTopicName + '-' + attempt + '-' + UUID.randomUUID();
LOG.info("Writing attempt #" + attempt);
// -------- Write the Sequence --------
createTestTopic(topicName, parallelism, replicationFactor);
StreamExecutionEnvironment writeEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
DataStream<Tuple2<Integer, Integer>> stream =
writeEnv.addSource(
new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx)
throws Exception {
int cnt = 0;
int partition =
getRuntimeContext().getIndexOfThisSubtask();
while (running && cnt < numElements) {
ctx.collect(new Tuple2<>(partition, cnt));
cnt++;
}
}
@Override
public void cancel() {
running = false;
}
})
.setParallelism(parallelism);
// the producer must not produce duplicates
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
kafkaServer
.produceIntoKafka(
stream,
topicName,
serSchema,
producerProperties,
new Tuple2FlinkPartitioner(parallelism))
.setParallelism(parallelism);
try {
writeEnv.execute("Write sequence");
} catch (Exception e) {
LOG.error("Write attempt failed, trying again", e);
deleteTestTopic(topicName);
waitUntilNoJobIsRunning(client);
continue;
}
LOG.info("Finished writing sequence");
// -------- Validate the Sequence --------
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
waitUntilNoJobIsRunning(client);
if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
// everything is good!
return topicName;
} else {
deleteTestTopic(topicName);
// fall through the loop
}
}
throw new Exception(
"Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
}
protected void writeAppendSequence(
String topicName,
final int originalNumElements,
final int numElementsToAppend,
final int parallelism)
throws Exception {
LOG.info(
"\n===================================\n"
+ "== Appending sequence of "
+ numElementsToAppend
+ " into "
+ topicName
+ "===================================");
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(
resultType, new ExecutionConfig()));
// -------- Write the append sequence --------
StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
DataStream<Tuple2<Integer, Integer>> stream =
writeEnv.addSource(
new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx)
throws Exception {
int cnt = originalNumElements;
int partition = getRuntimeContext().getIndexOfThisSubtask();
while (running
&& cnt
< numElementsToAppend
+ originalNumElements) {
ctx.collect(new Tuple2<>(partition, cnt));
cnt++;
}
}
@Override
public void cancel() {
running = false;
}
})
.setParallelism(parallelism);
// the producer must not produce duplicates
Properties producerProperties =
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
kafkaServer
.produceIntoKafka(
stream,
topicName,
serSchema,
producerProperties,
new Tuple2FlinkPartitioner(parallelism))
.setParallelism(parallelism);
try {
writeEnv.execute("Write sequence");
} catch (Exception e) {
throw new Exception("Failed to append sequence to Kafka; append job failed.", e);
}
LOG.info("Finished writing append sequence");
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
while (!getRunningJobs(client).isEmpty()) {
Thread.sleep(50);
}
if (!validateSequence(
topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
throw new Exception("Could not append a valid sequence to Kafka.");
}
}
private boolean validateSequence(
final String topic,
final int parallelism,
KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema,
final int totalNumElements)
throws Exception {
final StreamExecutionEnvironment readEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
readEnv.setParallelism(parallelism);
Properties readProps = (Properties) standardProps.clone();
readProps.setProperty("group.id", "flink-tests-validator");
readProps.putAll(secureProps);
DataStreamSource<Tuple2<Integer, Integer>> dataStreamSource;
if (useNewSource) {
KafkaSource<Tuple2<Integer, Integer>> source =
kafkaServer
.getSourceBuilder(topic, deserSchema, readProps)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
dataStreamSource =
readEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource");
} else {
FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer =
kafkaServer.getConsumer(topic, deserSchema, readProps);
consumer.setStartFromEarliest();
dataStreamSource = readEnv.addSource(consumer);
}
dataStreamSource
.map(
new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
private final int totalCount = parallelism * totalNumElements;
private int count = 0;
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value)
throws Exception {
if (++count == totalCount) {
throw new SuccessException();
} else {
return value;
}
}
})
.setParallelism(1)
.addSink(new DiscardingSink<>())
.setParallelism(1);
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
Thread runner =
new Thread(
() -> {
try {
submitJobAndWaitForResult(
client, jobGraph, getClass().getClassLoader());
tryExecute(readEnv, "sequence validation");
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, SuccessException.class)
.isPresent()) {
errorRef.set(t);
}
}
});
runner.start();
final long deadline = System.nanoTime() + 10_000_000_000L;
long delay;
while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
runner.join(delay / 1_000_000L);
}
boolean success;
if (runner.isAlive()) {
// did not finish in time, maybe the producer dropped one or more records and
// the validation did not reach the exit point
success = false;
client.cancel(jobId).get();
} else {
Throwable error = errorRef.get();
if (error != null) {
success = false;
LOG.info("Sequence validation job failed with exception", error);
} else {
success = true;
}
}
waitUntilNoJobIsRunning(client);
return success;
}
private <T> DataStreamSource<T> getStream(
StreamExecutionEnvironment env,
String topic,
DeserializationSchema<T> schema,
Properties props) {
return getStream(env, Collections.singletonList(topic), schema, props);
}
private <T> DataStreamSource<T> getStream(
StreamExecutionEnvironment env,
String topic,
KafkaDeserializationSchema<T> schema,
Properties props) {
return getStream(env, Collections.singletonList(topic), schema, props);
}
private <T> DataStreamSource<T> getStream(
StreamExecutionEnvironment env,
List<String> topics,
DeserializationSchema<T> schema,
Properties props) {
if (useNewSource) {
KafkaSource<T> kafkaSource =
kafkaServer.getSourceBuilder(topics, schema, props).build();
return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");
} else {
FlinkKafkaConsumerBase<T> flinkKafkaConsumer =
kafkaServer.getConsumer(topics, schema, props);
return env.addSource(flinkKafkaConsumer);
}
}
private <T> DataStreamSource<T> getStream(
StreamExecutionEnvironment env,
List<String> topics,
KafkaDeserializationSchema<T> schema,
Properties props) {
if (useNewSource) {
KafkaSource<T> kafkaSource =
kafkaServer.getSourceBuilder(topics, schema, props).build();
return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");
} else {
FlinkKafkaConsumerBase<T> flinkKafkaConsumer =
kafkaServer.getConsumer(topics, schema, props);
return env.addSource(flinkKafkaConsumer);
}
}
// ------------------------------------------------------------------------
// Debugging utilities
// ------------------------------------------------------------------------
private static class BrokerKillingMapper<T> extends RichMapFunction<T, T>
implements ListCheckpointed<Integer>, CheckpointListener {
private static final long serialVersionUID = 6334389850158707313L;
public static volatile boolean killedLeaderBefore;
public static volatile boolean hasBeenCheckpointedBeforeFailure;
private static KafkaTestEnvironment kafkaServerToKill;
private final int shutdownBrokerId;
private final int failCount;
private int numElementsTotal;
private boolean failer;
private boolean hasBeenCheckpointed;
public BrokerKillingMapper(
KafkaTestEnvironment kafkaServer, int shutdownBrokerId, int failCount) {
kafkaServerToKill = kafkaServer;
this.shutdownBrokerId = shutdownBrokerId;
this.failCount = failCount;
}
@Override
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
}
@Override
public T map(T value) throws Exception {
numElementsTotal++;
if (!killedLeaderBefore) {
Thread.sleep(10);
if (failer && numElementsTotal >= failCount) {
// shut down a Kafka broker
kafkaServerToKill.stopBroker(shutdownBrokerId);
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
killedLeaderBefore = true;
}
}
return value;
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
hasBeenCheckpointed = true;
}
@Override
public void notifyCheckpointAborted(long checkpointId) {}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.numElementsTotal);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
}
this.numElementsTotal = state.get(0);
}
}
private abstract static class AbstractTestDeserializer
implements KafkaDeserializationSchema<Tuple3<Integer, Integer, String>> {
protected final TypeSerializer<Tuple2<Integer, Integer>> ts;
public AbstractTestDeserializer(ExecutionConfig ec) {
ts =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
.createSerializer(ec);
}
@Override
public Tuple3<Integer, Integer, String> deserialize(ConsumerRecord<byte[], byte[]> record)
throws Exception {
DataInputView in =
new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Tuple2<Integer, Integer> t2 = ts.deserialize(in);
return new Tuple3<>(t2.f0, t2.f1, record.topic());
}
@Override
public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
return false;
}
@Override
public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>() {});
}
}
private static class Tuple2WithTopicSchema extends AbstractTestDeserializer
implements KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
public Tuple2WithTopicSchema(ExecutionConfig ec) {
super(ec);
}
@Override
public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
return null;
}
@Override
public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
ByteArrayOutputStream by = new ByteArrayOutputStream();
DataOutputView out = new DataOutputViewStreamWrapper(by);
try {
ts.serialize(new Tuple2<>(element.f0, element.f1), out);
} catch (IOException e) {
throw new RuntimeException("Error", e);
}
return by.toByteArray();
}
@Override
public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
return element.f2;
}
}
private static class TestDeserializer extends AbstractTestDeserializer
implements KafkaSerializationSchema<Tuple3<Integer, Integer, String>> {
public TestDeserializer(ExecutionConfig ec) {
super(ec);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
Tuple3<Integer, Integer, String> element, @Nullable Long timestamp) {
ByteArrayOutputStream by = new ByteArrayOutputStream();
DataOutputView out = new DataOutputViewStreamWrapper(by);
try {
ts.serialize(new Tuple2<>(element.f0, element.f1), out);
} catch (IOException e) {
throw new RuntimeException("Error", e);
}
byte[] serializedValue = by.toByteArray();
return new ProducerRecord<>(element.f2, serializedValue);
}
}
}