blob: 0ec0cc170dc5fb7d6bec7fbd77ac18ed61acf3c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Abstract test base for all Kafka producer tests.
*/
@SuppressWarnings("serial")
public abstract class KafkaProducerTestBase extends KafkaTestBase {
private static final long KAFKA_READ_TIMEOUT = 60_000L;
/**
* This tests verifies that custom partitioning works correctly, with a default topic
* and dynamic topic. The number of partitions for each topic is deliberately different.
*
* <p>Test topology:
*
* <pre>
* +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
* / | | | |
* | | | | ------+--> (sink)
* +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+
* / |
* | |
* (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+
* | | | | |
* \ | | | |
* +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink)
* | | | | |
* \ | | | |
* +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
* </pre>
*
* <p>Each topic has an independent mapper that validates the values come consistently from
* the correct Kafka partition of the topic is is responsible of.
*
* <p>Each topic also has a final sink that validates that there are no duplicates and that all
* partitions are present.
*/
@Test
public void testCustomPartitioning() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
final String defaultTopic = "defaultTopic";
final int defaultTopicPartitions = 2;
final String dynamicTopic = "dynamicTopic";
final int dynamicTopicPartitions = 3;
createTestTopic(defaultTopic, defaultTopicPartitions, 1);
createTestTopic(dynamicTopic, dynamicTopicPartitions, 1);
Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2);
expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){});
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
// ------ producing topology ---------
// source has DOP 1 to make sure it generates no duplicates
DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
long cnt = 0;
while (running) {
ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
cnt++;
if (cnt % 100 == 0) {
Thread.sleep(1);
}
}
}
@Override
public void cancel() {
running = false;
}
}).setParallelism(1);
Properties props = new Properties();
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
props.putAll(secureProps);
// sink partitions into
kafkaServer.produceIntoKafka(
stream,
defaultTopic,
// this serialization schema will route between the default topic and dynamic topic
new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
props,
new CustomPartitioner(expectedTopicsToNumPartitions))
.setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
// ------ consuming topology ---------
Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.putAll(secureProps);
FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource =
kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps);
FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource =
kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps);
env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
.map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
.addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
.map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
.addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
tryExecute(env, "custom partitioning test");
deleteTestTopic(defaultTopic);
deleteTestTopic(dynamicTopic);
LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* Tests the at-least-once semantic for the simple writes into Kafka.
*/
@Test
public void testOneToOneAtLeastOnceRegularSink() throws Exception {
testOneToOneAtLeastOnce(true);
}
/**
* Tests the at-least-once semantic for the simple writes into Kafka.
*/
@Test
public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
testOneToOneAtLeastOnce(false);
}
/**
* This test sets KafkaProducer so that it will not automatically flush the data and
* simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
* flushed records manually on snapshotState.
*
* <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
* parameter controls which method is used.
*/
protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
final int partition = 0;
final int numElements = 1000;
final int failAfterElements = 333;
createTestTopic(topic, 1, 1);
TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(500);
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
Properties properties = new Properties();
properties.putAll(standardProps);
properties.putAll(secureProps);
// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
properties.setProperty("timeout.ms", "10000");
properties.setProperty("max.block.ms", "10000");
// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
properties.setProperty("batch.size", "10240000");
properties.setProperty("linger.ms", "10000");
BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic);
// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
DataStream<Integer> inputStream = env
.fromCollection(getIntegersSequence(numElements))
.map(new BrokerRestartingMapper<>(failAfterElements));
StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
@Override
public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partition;
}
});
if (regularSink) {
inputStream.addSink(kafkaSink.getUserFunction());
}
else {
kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
@Override
public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partition;
}
});
}
FailingIdentityMapper.failedBefore = false;
try {
env.execute("One-to-one at least once test");
fail("Job should fail!");
}
catch (JobExecutionException ex) {
// ignore error, it can be one of many errors so it would be hard to check the exception message/cause
}
kafkaServer.unblockProxyTraffic();
// assert that before failure we successfully snapshot/flushed all expected elements
assertAtLeastOnceForTopic(
properties,
topic,
partition,
Collections.unmodifiableSet(new HashSet<>(getIntegersSequence(BrokerRestartingMapper.lastSnapshotedElementBeforeShutdown))),
KAFKA_READ_TIMEOUT);
deleteTestTopic(topic);
}
/**
* Tests the exactly-once semantic for the simple writes into Kafka.
*/
@Test
public void testExactlyOnceRegularSink() throws Exception {
testExactlyOnce(true);
}
/**
* Tests the exactly-once semantic for the simple writes into Kafka.
*/
@Test
public void testExactlyOnceCustomOperator() throws Exception {
testExactlyOnce(false);
}
/**
* This test sets KafkaProducer so that it will automatically flush the data and
* and fails the broker to check whether flushed records since last checkpoint were not duplicated.
*/
protected void testExactlyOnce(boolean regularSink) throws Exception {
final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
final int partition = 0;
final int numElements = 1000;
final int failAfterElements = 333;
createTestTopic(topic, 1, 1);
TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(500);
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
Properties properties = new Properties();
properties.putAll(standardProps);
properties.putAll(secureProps);
// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
List<Integer> expectedElements = getIntegersSequence(numElements);
DataStream<Integer> inputStream = env
.addSource(new IntegerSource(numElements))
.map(new FailingIdentityMapper<Integer>(failAfterElements));
FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
@Override
public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partition;
}
};
if (regularSink) {
StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner);
inputStream.addSink(kafkaSink.getUserFunction());
}
else {
kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner);
}
FailingIdentityMapper.failedBefore = false;
TestUtils.tryExecute(env, "Exactly once test");
// assert that before failure we successfully snapshot/flushed all expected elements
assertExactlyOnceForTopic(
properties,
topic,
partition,
expectedElements,
KAFKA_READ_TIMEOUT);
deleteTestTopic(topic);
}
private List<Integer> getIntegersSequence(int size) {
List<Integer> result = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
result.add(i);
}
return result;
}
// ------------------------------------------------------------------------
private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
private final Map<String, Integer> expectedTopicsToNumPartitions;
public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
}
@Override
public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length);
return (int) (next.f0 % partitions.length);
}
}
/**
* A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics.
*/
public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
private final String defaultTopic;
private final String dynamicTopic;
public CustomKeyedSerializationSchemaWrapper(
SerializationSchema<Tuple2<Long, String>> serializationSchema,
String defaultTopic,
String dynamicTopic) {
super(serializationSchema);
this.defaultTopic = Preconditions.checkNotNull(defaultTopic);
this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic);
}
@Override
public String getTargetTopic(Tuple2<Long, String> element) {
return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic;
}
}
/**
* Mapper that validates partitioning and maps to partition.
*/
public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
private final int numPartitions;
private int ourPartition = -1;
public PartitionValidatingMapper(int numPartitions) {
this.numPartitions = numPartitions;
}
@Override
public Integer map(Tuple2<Long, String> value) throws Exception {
int partition = value.f0.intValue() % numPartitions;
if (ourPartition != -1) {
assertEquals("inconsistent partitioning", ourPartition, partition);
} else {
ourPartition = partition;
}
return partition;
}
}
/**
* Sink that validates records received from each partition and checks that there are no duplicates.
*/
public static class PartitionValidatingSink implements SinkFunction<Integer> {
private final int[] valuesPerPartition;
public PartitionValidatingSink(int numPartitions) {
this.valuesPerPartition = new int[numPartitions];
}
@Override
public void invoke(Integer value) throws Exception {
valuesPerPartition[value]++;
boolean missing = false;
for (int i : valuesPerPartition) {
if (i < 100) {
missing = true;
break;
}
}
if (!missing) {
throw new SuccessException();
}
}
}
private static class BrokerRestartingMapper<T> extends RichMapFunction<T, T>
implements CheckpointedFunction, CheckpointListener {
private static final long serialVersionUID = 6334389850158707313L;
public static volatile boolean triggeredShutdown;
public static volatile int lastSnapshotedElementBeforeShutdown;
public static volatile Runnable shutdownAction;
private final int failCount;
private int numElementsTotal;
private boolean failer;
public static void resetState(Runnable shutdownAction) {
triggeredShutdown = false;
lastSnapshotedElementBeforeShutdown = 0;
BrokerRestartingMapper.shutdownAction = shutdownAction;
}
public BrokerRestartingMapper(int failCount) {
this.failCount = failCount;
}
@Override
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
}
@Override
public T map(T value) throws Exception {
numElementsTotal++;
Thread.sleep(10);
if (!triggeredShutdown && failer && numElementsTotal >= failCount) {
// shut down a Kafka broker
triggeredShutdown = true;
shutdownAction.run();
}
return value;
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!triggeredShutdown) {
lastSnapshotedElementBeforeShutdown = numElementsTotal;
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
}