blob: 1811e448e892b4882abbbc583ee9075006f80c4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* A class containing a special Kafka broker which has a log retention of only 250 ms.
* This way, we can make sure our consumer is properly handling cases where we run into out of offset
* errors
*/
@SuppressWarnings("serial")
public class KafkaShortRetentionTestBase implements Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
protected static final int NUM_TMS = 1;
protected static final int TM_SLOTS = 8;
protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
NUM_TMS,
TM_SLOTS));
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
protected static Properties secureProps = new Properties();
private static Configuration getConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
return flinkConfig;
}
@BeforeClass
public static void prepare() throws ClassNotFoundException {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaShortRetentionTestBase ");
LOG.info("-------------------------------------------------------------------------");
// dynamically load the implementation for the test
Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
if (kafkaServer.isSecureRunSupported()) {
secureProps = kafkaServer.getSecureProperties();
}
Properties specificProperties = new Properties();
specificProperties.setProperty("log.retention.hours", "0");
specificProperties.setProperty("log.retention.minutes", "0");
specificProperties.setProperty("log.retention.ms", "250");
specificProperties.setProperty("log.retention.check.interval.ms", "100");
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
standardProps = kafkaServer.getStandardProperties();
}
@AfterClass
public static void shutDownServices() throws Exception {
kafkaServer.shutdown();
secureProps.clear();
}
/**
* This test is concurrently reading and writing from a kafka topic.
* The job will run for a while
* In a special deserializationSchema, we make sure that the offsets from the topic
* are non-continuous (because the data is expiring faster than its consumed --> with auto.offset.reset = 'earliest', some offsets will not show up)
*
*/
private static boolean stopProducer = false;
public void runAutoOffsetResetTest() throws Exception {
final String topic = "auto-offset-reset-test";
final int parallelism = 1;
final int elementsPerPartition = 50000;
Properties tprops = new Properties();
tprops.setProperty("retention.ms", "250");
kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
// ----------- add producer dataflow ----------
DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws InterruptedException {
int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
int limit = cnt + elementsPerPartition;
while (running && !stopProducer && cnt < limit) {
ctx.collect("element-" + cnt);
cnt++;
Thread.sleep(10);
}
LOG.info("Stopping producer");
}
@Override
public void cancel() {
running = false;
}
});
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
// ----------- add consumer dataflow ----------
NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
DataStreamSource<String> consuming = env.addSource(source);
consuming.addSink(new DiscardingSink<String>());
tryExecute(env, "run auto offset reset test");
kafkaServer.deleteTestTopic(topic);
}
private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
private int numJumps;
long nextExpected = 0;
@Override
public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
if (offset != nextExpected) {
numJumps++;
nextExpected = offset;
LOG.info("Registered now jump at offset {}", offset);
}
nextExpected++;
try {
Thread.sleep(10); // slow down data consumption to trigger log eviction
} catch (InterruptedException e) {
throw new RuntimeException("Stopping it");
}
return "";
}
@Override
public boolean isEndOfStream(String nextElement) {
if (numJumps >= 5) {
// we saw 5 jumps and no failures --> consumer can handle auto.offset.reset
stopProducer = true;
return true;
}
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
/**
* Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
* @throws Exception
*/
public void runFailOnAutoOffsetResetNone() throws Exception {
final String topic = "auto-offset-reset-none-test";
final int parallelism = 1;
kafkaServer.createTestTopic(topic, parallelism, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
// ----------- add consumer ----------
Properties customProps = new Properties();
customProps.putAll(standardProps);
customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
DataStreamSource<String> consuming = env.addSource(source);
consuming.addSink(new DiscardingSink<String>());
try {
env.execute("Test auto offset reset none");
} catch (Throwable e) {
// check if correct exception has been thrown
if (!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
&& !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
) {
throw e;
}
}
kafkaServer.deleteTestTopic(topic);
}
public void runFailOnAutoOffsetResetNoneEager() throws Exception {
final String topic = "auto-offset-reset-none-test";
final int parallelism = 1;
kafkaServer.createTestTopic(topic, parallelism, 1);
// ----------- add consumer ----------
Properties customProps = new Properties();
customProps.putAll(standardProps);
customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
try {
kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
fail("should fail with an exception");
}
catch (IllegalArgumentException e) {
// expected
assertTrue(e.getMessage().contains("none"));
}
kafkaServer.deleteTestTopic(topic);
}
}