| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kafka; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.functions.FlatMapFunction; |
| import org.apache.flink.api.common.functions.RichFlatMapFunction; |
| import org.apache.flink.api.common.functions.RichMapFunction; |
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
| import org.apache.flink.api.common.serialization.DeserializationSchema; |
| import org.apache.flink.api.common.serialization.SimpleStringSchema; |
| import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; |
| import org.apache.flink.api.common.typeinfo.BasicTypeInfo; |
| import org.apache.flink.api.common.typeinfo.TypeHint; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.java.tuple.Tuple1; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.tuple.Tuple3; |
| import org.apache.flink.client.program.ClusterClient; |
| import org.apache.flink.client.program.ProgramInvocationException; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.memory.DataInputView; |
| import org.apache.flink.core.memory.DataInputViewStreamWrapper; |
| import org.apache.flink.core.memory.DataOutputView; |
| import org.apache.flink.core.memory.DataOutputViewStreamWrapper; |
| import org.apache.flink.runtime.client.JobCancellationException; |
| import org.apache.flink.runtime.client.JobExecutionException; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobStatus; |
| import org.apache.flink.runtime.state.CheckpointListener; |
| import org.apache.flink.streaming.api.CheckpointingMode; |
| import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.datastream.DataStreamSource; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.streaming.api.functions.sink.DiscardingSink; |
| import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; |
| import org.apache.flink.streaming.api.functions.sink.SinkFunction; |
| import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; |
| import org.apache.flink.streaming.api.functions.source.RichSourceFunction; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; |
| import org.apache.flink.streaming.connectors.kafka.config.StartupMode; |
| import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; |
| import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; |
| import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; |
| import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; |
| import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; |
| import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner; |
| import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; |
| import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; |
| import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; |
| import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; |
| import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; |
| import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; |
| import org.apache.flink.test.util.SuccessException; |
| import org.apache.flink.testutils.junit.RetryOnException; |
| import org.apache.flink.testutils.junit.RetryRule; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.util.ExceptionUtils; |
| |
| import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; |
| |
| import kafka.consumer.Consumer; |
| import kafka.consumer.ConsumerConfig; |
| import kafka.consumer.ConsumerIterator; |
| import kafka.consumer.KafkaStream; |
| import kafka.javaapi.consumer.ConsumerConnector; |
| import kafka.message.MessageAndMetadata; |
| import kafka.server.KafkaServer; |
| import org.apache.commons.io.output.ByteArrayOutputStream; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.common.errors.TimeoutException; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| |
| import javax.management.MBeanServer; |
| import javax.management.ObjectName; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.lang.management.ManagementFactory; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS; |
| import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs; |
| import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning; |
| import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning; |
| import static org.apache.flink.test.util.TestUtils.tryExecute; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Abstract test base for all Kafka consumer tests. |
| */ |
| @SuppressWarnings("serial") |
| public abstract class KafkaConsumerTestBase extends KafkaTestBase { |
| |
| @Rule |
| public RetryRule retryRule = new RetryRule(); |
| |
| private ClusterClient<?> client; |
| |
| // ------------------------------------------------------------------------ |
| // Common Test Preparation |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Makes sure that no job is on the JobManager any more from any previous tests that use |
| * the same mini cluster. Otherwise, missing slots may happen. |
| */ |
| @Before |
| public void setClientAndEnsureNoJobIsLingering() throws Exception { |
| client = flink.getClusterClient(); |
| waitUntilNoJobIsRunning(client); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Suite of Tests |
| // |
| // The tests here are all not activated (by an @Test tag), but need |
| // to be invoked from the extending classes. That way, the classes can |
| // select which tests to run. |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist |
| * and a wrong broker was specified. |
| * |
| * @throws Exception |
| */ |
| public void runFailOnNoBrokerTest() throws Exception { |
| try { |
| Properties properties = new Properties(); |
| |
| StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); |
| see.getConfig().disableSysoutLogging(); |
| see.setRestartStrategy(RestartStrategies.noRestart()); |
| see.setParallelism(1); |
| |
| // use wrong ports for the consumers |
| properties.setProperty("bootstrap.servers", "localhost:80"); |
| properties.setProperty("zookeeper.connect", "localhost:80"); |
| properties.setProperty("group.id", "test"); |
| properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast |
| properties.setProperty("socket.timeout.ms", "3000"); |
| properties.setProperty("session.timeout.ms", "2000"); |
| properties.setProperty("fetch.max.wait.ms", "2000"); |
| properties.setProperty("heartbeat.interval.ms", "1000"); |
| properties.putAll(secureProps); |
| FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties); |
| DataStream<String> stream = see.addSource(source); |
| stream.print(); |
| see.execute("No broker test"); |
| } catch (JobExecutionException jee) { |
| if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) { |
| assertTrue(jee.getCause() instanceof TimeoutException); |
| |
| TimeoutException te = (TimeoutException) jee.getCause(); |
| |
| assertEquals("Timeout expired while fetching topic metadata", te.getMessage()); |
| } else { |
| assertTrue(jee.getCause() instanceof RuntimeException); |
| |
| RuntimeException re = (RuntimeException) jee.getCause(); |
| |
| assertTrue(re.getMessage().contains("Unable to retrieve any partitions")); |
| } |
| } |
| } |
| |
| /** |
| * Ensures that the committed offsets to Kafka are the offsets of "the next record to process". |
| */ |
| public void runCommitOffsetsToKafka() throws Exception { |
| // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) |
| final int parallelism = 3; |
| final int recordsInEachPartition = 50; |
| |
| final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| // We need to turn on the partition dynamic discovery so that the source task won't exit after reaching the |
| // log end. |
| props.setProperty(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10000"); |
| DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), props)); |
| stream.addSink(new DiscardingSink<String>()); |
| |
| final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
| final Thread runner = new Thread("runner") { |
| @Override |
| public void run() { |
| try { |
| env.execute(); |
| } |
| catch (Throwable t) { |
| if (!(t instanceof JobCancellationException)) { |
| errorRef.set(t); |
| } |
| } |
| } |
| }; |
| runner.start(); |
| |
| final Long l50 = 50L; // the final committed offset in Kafka should be 50 |
| final long deadline = 30_000_000_000L + System.nanoTime(); |
| |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| |
| do { |
| Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); |
| Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); |
| Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); |
| |
| if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { |
| break; |
| } |
| |
| Thread.sleep(100); |
| } |
| while (System.nanoTime() < deadline); |
| |
| // cancel the job & wait for the job to finish |
| client.cancel(Iterables.getOnlyElement(getRunningJobs(client))); |
| runner.join(); |
| |
| final Throwable t = errorRef.get(); |
| if (t != null) { |
| throw new RuntimeException("Job failed with an exception", t); |
| } |
| |
| // final check to see if offsets are correctly in Kafka |
| Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); |
| Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); |
| Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); |
| Assert.assertEquals(Long.valueOf(50L), o1); |
| Assert.assertEquals(Long.valueOf(50L), o2); |
| Assert.assertEquals(Long.valueOf(50L), o3); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| } |
| |
| /** |
| * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset |
| * is committed to Kafka, even if some partitions are not read. |
| * |
| * <p>Test: |
| * - Create 3 partitions |
| * - write 50 messages into each. |
| * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka. |
| * - Check if the offsets in Kafka are set to 50 for the three partitions |
| * |
| * <p>See FLINK-3440 as well |
| */ |
| public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { |
| // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) |
| final int parallelism = 3; |
| final int recordsInEachPartition = 50; |
| |
| final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env.setParallelism(parallelism); |
| env.enableCheckpointing(200); |
| |
| Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read |
| |
| DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps)); |
| stream.addSink(new DiscardingSink<String>()); |
| |
| final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
| final Thread runner = new Thread("runner") { |
| @Override |
| public void run() { |
| try { |
| env.execute(); |
| } |
| catch (Throwable t) { |
| if (!(t instanceof JobCancellationException)) { |
| errorRef.set(t); |
| } |
| } |
| } |
| }; |
| runner.start(); |
| |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| |
| final Long l50 = 50L; // the final committed offset in Kafka should be 50 |
| final long deadline = 30_000_000_000L + System.nanoTime(); |
| do { |
| Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); |
| Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); |
| Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); |
| |
| if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { |
| break; |
| } |
| |
| Thread.sleep(100); |
| } |
| while (System.nanoTime() < deadline); |
| |
| // cancel the job & wait for the job to finish |
| client.cancel(Iterables.getOnlyElement(getRunningJobs(client))); |
| runner.join(); |
| |
| final Throwable t = errorRef.get(); |
| if (t != null) { |
| throw new RuntimeException("Job failed with an exception", t); |
| } |
| |
| // final check to see if offsets are correctly in Kafka |
| Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); |
| Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); |
| Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); |
| Assert.assertEquals(Long.valueOf(50L), o1); |
| Assert.assertEquals(Long.valueOf(50L), o2); |
| Assert.assertEquals(Long.valueOf(50L), o3); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| } |
| |
| /** |
| * This test ensures that when explicitly set to start from earliest record, the consumer |
| * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. |
| */ |
| public void runStartFromEarliestOffsets() throws Exception { |
| // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) |
| final int parallelism = 3; |
| final int recordsInEachPartition = 50; |
| |
| final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.setParallelism(parallelism); |
| |
| Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored |
| |
| // the committed offsets should be ignored |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); |
| |
| readSequence(env, StartupMode.EARLIEST, null, null, readProps, parallelism, topicName, recordsInEachPartition, 0); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| } |
| |
| /** |
| * This test ensures that when explicitly set to start from latest record, the consumer |
| * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. |
| */ |
| public void runStartFromLatestOffsets() throws Exception { |
| // 50 records written to each of 3 partitions before launching a latest-starting consuming job |
| final int parallelism = 3; |
| final int recordsInEachPartition = 50; |
| |
| // each partition will be written an extra 200 records |
| final int extraRecordsInEachPartition = 200; |
| |
| // all already existing data in the topic, before the consuming topology has started, should be ignored |
| final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); |
| |
| // the committed offsets should be ignored |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); |
| |
| // job names for the topologies for writing and consuming the extra records |
| final String consumeExtraRecordsJobName = "Consume Extra Records Job"; |
| final String writeExtraRecordsJobName = "Write Extra Records Job"; |
| |
| // serialization / deserialization schemas for writing and consuming the extra records |
| final TypeInformation<Tuple2<Integer, Integer>> resultType = |
| TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); |
| |
| final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = |
| new KeyedSerializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = |
| new KeyedDeserializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| // setup and run the latest-consuming job |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.setParallelism(parallelism); |
| |
| final Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored |
| |
| FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer = |
| kafkaServer.getConsumer(topicName, deserSchema, readProps); |
| latestReadingConsumer.setStartFromLatest(); |
| |
| env |
| .addSource(latestReadingConsumer).setParallelism(parallelism) |
| .flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Object>() { |
| @Override |
| public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception { |
| if (value.f1 - recordsInEachPartition < 0) { |
| throw new RuntimeException("test failed; consumed a record that was previously written: " + value); |
| } |
| } |
| }).setParallelism(1) |
| .addSink(new DiscardingSink<>()); |
| |
| JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); |
| final JobID consumeJobId = jobGraph.getJobID(); |
| |
| final AtomicReference<Throwable> error = new AtomicReference<>(); |
| Thread consumeThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| client.setDetached(false); |
| client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader(), false); |
| } catch (Throwable t) { |
| if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { |
| error.set(t); |
| } |
| } |
| } |
| }); |
| consumeThread.start(); |
| |
| // wait until the consuming job has started, to be extra safe |
| waitUntilJobIsRunning(client); |
| |
| // setup the extra records writing job |
| final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| env2.setParallelism(parallelism); |
| |
| DataStream<Tuple2<Integer, Integer>> extraRecordsStream = env2 |
| .addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { |
| |
| private boolean running = true; |
| |
| @Override |
| public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { |
| int count = recordsInEachPartition; // the extra records should start from the last written value |
| int partition = getRuntimeContext().getIndexOfThisSubtask(); |
| |
| while (running && count < recordsInEachPartition + extraRecordsInEachPartition) { |
| ctx.collect(new Tuple2<>(partition, count)); |
| count++; |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }); |
| |
| kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null); |
| |
| try { |
| env2.execute(writeExtraRecordsJobName); |
| } |
| catch (Exception e) { |
| throw new RuntimeException("Writing extra records failed", e); |
| } |
| |
| // cancel the consume job after all extra records are written |
| client.cancel(consumeJobId); |
| consumeThread.join(); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| |
| // check whether the consuming thread threw any test errors; |
| // test will fail here if the consume job had incorrectly read any records other than the extra records |
| final Throwable consumerError = error.get(); |
| if (consumerError != null) { |
| throw new Exception("Exception in the consuming thread", consumerError); |
| } |
| } |
| |
| /** |
| * This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to "auto.offset.reset" |
| * behaviour when necessary, when explicitly configured to start from group offsets. |
| * |
| * <p>The partitions and their committed group offsets are setup as: |
| * partition 0 --> committed offset 23 |
| * partition 1 --> no commit offset |
| * partition 2 --> committed offset 43 |
| * |
| * <p>When configured to start from group offsets, each partition should read: |
| * partition 0 --> start from offset 23, read to offset 49 (27 records) |
| * partition 1 --> default to "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records) |
| * partition 2 --> start from offset 43, read to offset 49 (7 records) |
| */ |
| public void runStartFromGroupOffsets() throws Exception { |
| // 3 partitions with 50 records each (offsets 0-49) |
| final int parallelism = 3; |
| final int recordsInEachPartition = 50; |
| |
| final String topicName = writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, parallelism, 1); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.setParallelism(parallelism); |
| |
| Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| readProps.setProperty("auto.offset.reset", "earliest"); |
| |
| // the committed group offsets should be used as starting points |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| |
| // only partitions 0 and 2 have group offsets committed |
| kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); |
| |
| Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>(); |
| partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(27, 23)); // partition 0 should read offset 23-49 |
| partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49 |
| partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49 |
| |
| readSequence(env, StartupMode.GROUP_OFFSETS, null, null, readProps, topicName, partitionsToValueCountAndStartOffsets); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| } |
| |
| /** |
| * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to |
| * start from specific offsets. For partitions which a specific offset can not be found for, the starting position |
| * for them should fallback to the group offsets behaviour. |
| * |
| * <p>4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is: |
| * partition 0 --> start from offset 19 |
| * partition 1 --> not set |
| * partition 2 --> start from offset 22 |
| * partition 3 --> not set |
| * partition 4 --> start from offset 26 (this should be ignored because the partition does not exist) |
| * |
| * <p>The partitions and their committed group offsets are setup as: |
| * partition 0 --> committed offset 23 |
| * partition 1 --> committed offset 31 |
| * partition 2 --> committed offset 43 |
| * partition 3 --> no commit offset |
| * |
| * <p>When configured to start from these specific offsets, each partition should read: |
| * partition 0 --> start from offset 19, read to offset 49 (31 records) |
| * partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records) |
| * partition 2 --> start from offset 22, read to offset 49 (28 records) |
| * partition 3 --> fallback to group offsets, but since there is no group offset for this partition, |
| * will default to "auto.offset.reset" (set to "earliest"), |
| * so start from offset 0, read to offset 49 (50 records) |
| */ |
| public void runStartFromSpecificOffsets() throws Exception { |
| // 4 partitions with 50 records each (offsets 0-49) |
| final int parallelism = 4; |
| final int recordsInEachPartition = 50; |
| |
| final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.setParallelism(parallelism); |
| |
| Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour |
| |
| Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>(); |
| specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L); |
| specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L); |
| specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored |
| |
| // only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map |
| KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); |
| kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); |
| |
| Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>(); |
| partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49 |
| partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49 |
| partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 |
| partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 |
| |
| readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets); |
| |
| kafkaOffsetHandler.close(); |
| deleteTestTopic(topicName); |
| } |
| |
| /** |
| * This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to |
| * start from timestamp. |
| * |
| * <p>The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition. |
| * After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded. |
| * For the validation, when the read job is configured to start from the first timestamp, each partition should start |
| * from offset 0 and read a total of 80 records. When configured to start from the second timestamp, |
| * each partition should start from offset 50 and read on the remaining 30 appended records. |
| */ |
| public void runStartFromTimestamp() throws Exception { |
| // 4 partitions with 50 records each |
| final int parallelism = 4; |
| final int initialRecordsInEachPartition = 50; |
| final int appendRecordsInEachPartition = 30; |
| |
| // attempt to create an appended test sequence, where the timestamp of writing the appended sequence |
| // is assured to be larger than the timestamp of the original sequence. |
| long firstTimestamp = System.currentTimeMillis(); |
| String topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1); |
| |
| long secondTimestamp = 0; |
| while (secondTimestamp <= firstTimestamp) { |
| Thread.sleep(1000); |
| secondTimestamp = System.currentTimeMillis(); |
| } |
| writeAppendSequence(topic, initialRecordsInEachPartition, appendRecordsInEachPartition, parallelism); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| env.setParallelism(parallelism); |
| |
| Properties readProps = new Properties(); |
| readProps.putAll(standardProps); |
| |
| readSequence(env, StartupMode.TIMESTAMP, null, firstTimestamp, |
| readProps, parallelism, topic, initialRecordsInEachPartition + appendRecordsInEachPartition, 0); |
| readSequence(env, StartupMode.TIMESTAMP, null, secondTimestamp, |
| readProps, parallelism, topic, appendRecordsInEachPartition, initialRecordsInEachPartition); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Ensure Kafka is working on both producer and consumer side. |
| * This executes a job that contains two Flink pipelines. |
| * |
| * <pre> |
| * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink) |
| * </pre> |
| * |
| * <p>We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer |
| * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that |
| * cause the test to fail. |
| * |
| * <p>This test also ensures that FLINK-3156 doesn't happen again: |
| * |
| * <p>The following situation caused a NPE in the FlinkKafkaConsumer |
| * |
| * <p>topic-1 <-- elements are only produced into topic1. |
| * topic-2 |
| * |
| * <p>Therefore, this test is consuming as well from an empty topic. |
| * |
| */ |
| @RetryOnException(times = 2, exception = kafka.common.NotLeaderForPartitionException.class) |
| public void runSimpleConcurrentProducerConsumerTopology() throws Exception { |
| final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString(); |
| final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString(); |
| |
| final int parallelism = 3; |
| final int elementsPerPartition = 100; |
| final int totalElements = parallelism * elementsPerPartition; |
| |
| createTestTopic(topic, parallelism, 2); |
| createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time |
| |
| final StreamExecutionEnvironment env = |
| StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(parallelism); |
| // env.enableCheckpointing(500); |
| env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately |
| env.getConfig().disableSysoutLogging(); |
| |
| TypeInformation<Tuple2<Long, String>> longStringType = TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){}); |
| |
| TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema = |
| new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); |
| |
| TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema = |
| new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); |
| |
| // ----------- add producer dataflow ---------- |
| |
| DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long, String>>() { |
| |
| private boolean running = true; |
| |
| @Override |
| public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException { |
| int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition; |
| int limit = cnt + elementsPerPartition; |
| |
| while (running && cnt < limit) { |
| ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt)); |
| cnt++; |
| // we delay data generation a bit so that we are sure that some checkpoints are |
| // triggered (for FLINK-3156) |
| Thread.sleep(50); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }); |
| Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); |
| producerProperties.setProperty("retries", "3"); |
| producerProperties.putAll(secureProps); |
| kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); |
| |
| // ----------- add consumer dataflow ---------- |
| |
| List<String> topics = new ArrayList<>(); |
| topics.add(topic); |
| topics.add(additionalEmptyTopic); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props); |
| |
| DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism); |
| |
| consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() { |
| |
| private int elCnt = 0; |
| private BitSet validator = new BitSet(totalElements); |
| |
| @Override |
| public void invoke(Tuple2<Long, String> value) throws Exception { |
| String[] sp = value.f1.split("-"); |
| int v = Integer.parseInt(sp[1]); |
| |
| assertEquals(value.f0 - 1000, (long) v); |
| |
| assertFalse("Received tuple twice", validator.get(v)); |
| validator.set(v); |
| elCnt++; |
| |
| if (elCnt == totalElements) { |
| // check if everything in the bitset is set to true |
| int nc; |
| if ((nc = validator.nextClearBit(0)) != totalElements) { |
| fail("The bitset was not set to 1 on all elements. Next clear:" |
| + nc + " Set: " + validator); |
| } |
| throw new SuccessException(); |
| } |
| } |
| |
| @Override |
| public void close() throws Exception { |
| super.close(); |
| } |
| }).setParallelism(1); |
| |
| try { |
| tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology"); |
| } |
| catch (ProgramInvocationException | JobExecutionException e) { |
| // look for NotLeaderForPartitionException |
| Throwable cause = e.getCause(); |
| |
| // search for nested SuccessExceptions |
| int depth = 0; |
| while (cause != null && depth++ < 20) { |
| if (cause instanceof kafka.common.NotLeaderForPartitionException) { |
| throw (Exception) cause; |
| } |
| cause = cause.getCause(); |
| } |
| throw e; |
| } |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and |
| * Flink sources. |
| */ |
| public void runOneToOneExactlyOnceTest() throws Exception { |
| |
| final String topic = "oneToOneTopic"; |
| final int parallelism = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = parallelism * numElementsPerPartition; |
| final int failAfterElements = numElementsPerPartition / 3; |
| |
| createTestTopic(topic, parallelism, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| parallelism, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500); |
| env.setParallelism(parallelism); |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| |
| env |
| .addSource(kafkaSource) |
| .map(new PartitionValidatingMapper(parallelism, 1)) |
| .map(new FailingIdentityMapper<Integer>(failAfterElements)) |
| .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); |
| |
| FailingIdentityMapper.failedBefore = false; |
| tryExecute(env, "One-to-one exactly once test"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and |
| * Flink sources. |
| */ |
| public void runOneToOneTest(boolean isFinite) throws Exception { |
| |
| final String topic = "oneToOneTopic"; |
| final int parallelism = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = parallelism * numElementsPerPartition; |
| |
| createTestTopic(topic, parallelism, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| parallelism, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(5000); |
| env.setParallelism(parallelism); |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| |
| kafkaSource.setStopAtLatest(isFinite); |
| |
| env |
| .addSource(kafkaSource) |
| .addSink(new ValidatingExactlyOnceSink(totalElements, false)).setParallelism(1); |
| |
| env.execute(); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so |
| * one Flink source will read multiple Kafka partitions. |
| */ |
| public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception { |
| final String topic = "oneToManyTopic"; |
| final int numPartitions = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = numPartitions * numElementsPerPartition; |
| final int failAfterElements = numElementsPerPartition / 3; |
| |
| final int parallelism = 2; |
| |
| createTestTopic(topic, numPartitions, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| numPartitions, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500); |
| env.setParallelism(parallelism); |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| |
| env |
| .addSource(kafkaSource) |
| .map(new PartitionValidatingMapper(numPartitions, 3)) |
| .map(new FailingIdentityMapper<Integer>(failAfterElements)) |
| .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); |
| |
| FailingIdentityMapper.failedBefore = false; |
| tryExecute(env, "One-source-multi-partitions exactly once test"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so |
| * one Flink source will read multiple Kafka partitions. |
| */ |
| public void runOneSourceMultiplePartitionsFiniteTest() throws Exception { |
| final String topic = "oneToManyTopic"; |
| final int numPartitions = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = numPartitions * numElementsPerPartition; |
| |
| final int parallelism = 2; |
| |
| createTestTopic(topic, numPartitions, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| numPartitions, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500); |
| env.setParallelism(parallelism); |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| kafkaSource.setStopAtLatest(true); |
| env |
| .addSource(kafkaSource) |
| .addSink(new ValidatingExactlyOnceSink(totalElements, false)).setParallelism(1); |
| |
| env.execute("One-source-multi-partitions exactly once test"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having more Flink sources than Kafka partitions, which means |
| * that some Flink sources will read no partitions. |
| */ |
| public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception { |
| final String topic = "manyToOneTopic"; |
| final int numPartitions = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = numPartitions * numElementsPerPartition; |
| final int failAfterElements = numElementsPerPartition / 3; |
| |
| final int parallelism = 8; |
| |
| createTestTopic(topic, numPartitions, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| numPartitions, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500); |
| env.setParallelism(parallelism); |
| // set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions. |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| env.setBufferTimeout(0); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| |
| env |
| .addSource(kafkaSource) |
| .map(new PartitionValidatingMapper(numPartitions, 1)) |
| .map(new FailingIdentityMapper<Integer>(failAfterElements)) |
| .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); |
| |
| FailingIdentityMapper.failedBefore = false; |
| tryExecute(env, "multi-source-one-partitions exactly once test"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests the proper consumption when having more Flink sources than Kafka partitions, which means |
| * that some Flink sources will read no partitions. |
| */ |
| public void runMultipleSourcesOnePartitionFiniteTest() throws Exception { |
| final String topic = "manyToOneTopic"; |
| final int numPartitions = 5; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = numPartitions * numElementsPerPartition; |
| |
| final int parallelism = 8; |
| |
| createTestTopic(topic, numPartitions, 1); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, |
| numPartitions, |
| numElementsPerPartition, |
| true); |
| |
| // run the topology that fails and recovers |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500); |
| env.setParallelism(parallelism); |
| // set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions. |
| env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); |
| env.getConfig().disableSysoutLogging(); |
| env.setBufferTimeout(0); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| kafkaSource.setStopAtLatest(true); |
| env |
| .addSource(kafkaSource) |
| .addSink(new ValidatingExactlyOnceSink(totalElements, false)).setParallelism(1); |
| |
| env.execute(); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests that the source can be properly canceled when reading full partitions. |
| */ |
| public void runCancelingOnFullInputTest() throws Exception { |
| final String topic = "cancelingOnFullTopic"; |
| |
| final int parallelism = 3; |
| createTestTopic(topic, parallelism, 1); |
| |
| // launch a producer thread |
| DataGenerators.InfiniteStringsGenerator generator = |
| new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); |
| generator.start(); |
| |
| // launch a consumer asynchronously |
| |
| final AtomicReference<Throwable> jobError = new AtomicReference<>(); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(parallelism); |
| env.enableCheckpointing(100); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); |
| |
| env.addSource(source).addSink(new DiscardingSink<String>()); |
| |
| JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); |
| final JobID jobId = jobGraph.getJobID(); |
| |
| final Runnable jobRunner = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| client.setDetached(false); |
| client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader(), false); |
| } |
| catch (Throwable t) { |
| jobError.set(t); |
| } |
| } |
| }; |
| |
| Thread runnerThread = new Thread(jobRunner, "program runner thread"); |
| runnerThread.start(); |
| |
| // wait a bit before canceling |
| Thread.sleep(2000); |
| |
| Throwable failueCause = jobError.get(); |
| if (failueCause != null) { |
| failueCause.printStackTrace(); |
| Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); |
| } |
| |
| // cancel |
| client.cancel(jobId); |
| |
| // wait for the program to be done and validate that we failed with the right exception |
| runnerThread.join(); |
| |
| assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); |
| |
| if (generator.isAlive()) { |
| generator.shutdown(); |
| generator.join(); |
| } |
| else { |
| Throwable t = generator.getError(); |
| if (t != null) { |
| t.printStackTrace(); |
| fail("Generator failed: " + t.getMessage()); |
| } else { |
| fail("Generator failed with no exception"); |
| } |
| } |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Tests that the source can be properly canceled when reading empty partitions. |
| */ |
| public void runCancelingOnEmptyInputTest() throws Exception { |
| final String topic = "cancelingOnEmptyInputTopic"; |
| |
| final int parallelism = 3; |
| createTestTopic(topic, parallelism, 1); |
| |
| final AtomicReference<Throwable> error = new AtomicReference<>(); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(parallelism); |
| env.enableCheckpointing(100); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); |
| |
| env.addSource(source).addSink(new DiscardingSink<String>()); |
| |
| JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); |
| final JobID jobId = jobGraph.getJobID(); |
| |
| final Runnable jobRunner = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| client.setDetached(false); |
| client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader(), false); |
| } |
| catch (Throwable t) { |
| LOG.error("Job Runner failed with exception", t); |
| error.set(t); |
| } |
| } |
| }; |
| |
| Thread runnerThread = new Thread(jobRunner, "program runner thread"); |
| runnerThread.start(); |
| |
| // wait a bit before canceling |
| Thread.sleep(2000); |
| |
| Throwable failueCause = error.get(); |
| if (failueCause != null) { |
| failueCause.printStackTrace(); |
| Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); |
| } |
| // cancel |
| client.cancel(jobId); |
| |
| // wait for the program to be done and validate that we failed with the right exception |
| runnerThread.join(); |
| |
| assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Test producing and consuming into multiple topics. |
| * @throws Exception |
| */ |
| public void runProduceConsumeMultipleTopics() throws Exception { |
| final int numTopics = 5; |
| final int numElements = 20; |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| |
| // create topics with content |
| final List<String> topics = new ArrayList<>(); |
| for (int i = 0; i < numTopics; i++) { |
| final String topic = "topic-" + i; |
| topics.add(topic); |
| // create topic |
| createTestTopic(topic, i + 1 /*partitions*/, 1); |
| } |
| |
| // before FLINK-6078 the RemoteExecutionEnvironment set the parallelism to 1 as well |
| env.setParallelism(1); |
| |
| // run first job, producing into all topics |
| DataStream<Tuple3<Integer, Integer, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() { |
| |
| @Override |
| public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception { |
| int partition = getRuntimeContext().getIndexOfThisSubtask(); |
| |
| for (int topicId = 0; topicId < numTopics; topicId++) { |
| for (int i = 0; i < numElements; i++) { |
| ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId)); |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| } |
| }); |
| |
| Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null); |
| |
| env.execute("Write to topics"); |
| |
| // run second job consuming from multiple topics |
| env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.getConfig().disableSysoutLogging(); |
| |
| stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); |
| |
| stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { |
| Map<String, Integer> countPerTopic = new HashMap<>(numTopics); |
| @Override |
| public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception { |
| Integer count = countPerTopic.get(value.f2); |
| if (count == null) { |
| count = 1; |
| } else { |
| count++; |
| } |
| countPerTopic.put(value.f2, count); |
| |
| // check map: |
| for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) { |
| if (el.getValue() < numElements) { |
| break; // not enough yet |
| } |
| if (el.getValue() > numElements) { |
| throw new RuntimeException("There is a failure in the test. I've read " + |
| el.getValue() + " from topic " + el.getKey()); |
| } |
| } |
| // we've seen messages from all topics |
| throw new SuccessException(); |
| } |
| }).setParallelism(1); |
| |
| tryExecute(env, "Count elements from the topics"); |
| |
| // delete all topics again |
| for (int i = 0; i < numTopics; i++) { |
| final String topic = "topic-" + i; |
| deleteTestTopic(topic); |
| } |
| } |
| |
| /** |
| * Test Flink's Kafka integration also with very big records (30MB). |
| * |
| * <p>see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message |
| * |
| */ |
| public void runBigRecordTestTopology() throws Exception { |
| |
| final String topic = "bigRecordTestTopic"; |
| final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space |
| |
| createTestTopic(topic, parallelism, 1); |
| |
| final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = |
| TypeInformation.of(new TypeHint<Tuple2<Long, byte[]>>(){}); |
| |
| final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = |
| new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); |
| |
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| env.enableCheckpointing(100); |
| env.setParallelism(parallelism); |
| |
| // add consuming topology: |
| Properties consumerProps = new Properties(); |
| consumerProps.putAll(standardProps); |
| consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14)); |
| consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher |
| consumerProps.setProperty("queued.max.message.chunks", "1"); |
| consumerProps.putAll(secureProps); |
| |
| FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps); |
| DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source); |
| |
| consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() { |
| |
| private int elCnt = 0; |
| |
| @Override |
| public void invoke(Tuple2<Long, byte[]> value) throws Exception { |
| elCnt++; |
| if (value.f0 == -1) { |
| // we should have seen 11 elements now. |
| if (elCnt == 11) { |
| throw new SuccessException(); |
| } else { |
| throw new RuntimeException("There have been " + elCnt + " elements"); |
| } |
| } |
| if (elCnt > 10) { |
| throw new RuntimeException("More than 10 elements seen: " + elCnt); |
| } |
| } |
| }); |
| |
| // add producing topology |
| Properties producerProps = new Properties(); |
| producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15)); |
| producerProps.setProperty("retries", "3"); |
| producerProps.putAll(secureProps); |
| producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings); |
| |
| DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { |
| |
| private boolean running; |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| super.open(parameters); |
| running = true; |
| } |
| |
| @Override |
| public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception { |
| Random rnd = new Random(); |
| long cnt = 0; |
| int sevenMb = 1024 * 1024 * 7; |
| |
| while (running) { |
| byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)]; |
| ctx.collect(new Tuple2<>(cnt++, wl)); |
| |
| Thread.sleep(100); |
| |
| if (cnt == 10) { |
| // signal end |
| ctx.collect(new Tuple2<>(-1L, new byte[]{1})); |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }); |
| |
| kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null); |
| |
| tryExecute(env, "big topology test"); |
| deleteTestTopic(topic); |
| } |
| |
| public void runBrokerFailureTest() throws Exception { |
| final String topic = "brokerFailureTestTopic"; |
| |
| final int parallelism = 2; |
| final int numElementsPerPartition = 1000; |
| final int totalElements = parallelism * numElementsPerPartition; |
| final int failAfterElements = numElementsPerPartition / 3; |
| |
| createTestTopic(topic, parallelism, 2); |
| |
| DataGenerators.generateRandomizedIntegerSequence( |
| StreamExecutionEnvironment.getExecutionEnvironment(), |
| kafkaServer, |
| topic, parallelism, numElementsPerPartition, true); |
| |
| // find leader to shut down |
| int leaderId = kafkaServer.getLeaderToShutDown(topic); |
| |
| LOG.info("Leader to shutdown {}", leaderId); |
| |
| // run the topology (the consumers must handle the failures) |
| |
| DeserializationSchema<Integer> schema = |
| new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(parallelism); |
| env.enableCheckpointing(500); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); |
| |
| env |
| .addSource(kafkaSource) |
| .map(new PartitionValidatingMapper(parallelism, 1)) |
| .map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements)) |
| .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); |
| |
| BrokerKillingMapper.killedLeaderBefore = false; |
| tryExecute(env, "Broker failure once test"); |
| |
| // start a new broker: |
| kafkaServer.restartBroker(leaderId); |
| } |
| |
| public void runKeyValueTest() throws Exception { |
| final String topic = "keyvaluetest"; |
| createTestTopic(topic, 1, 1); |
| final int elementCount = 5000; |
| |
| // ----------- Write some data into Kafka ------------------- |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(1); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| |
| DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() { |
| @Override |
| public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception { |
| Random rnd = new Random(1337); |
| for (long i = 0; i < elementCount; i++) { |
| PojoValue pojo = new PojoValue(); |
| pojo.when = new Date(rnd.nextLong()); |
| pojo.lon = rnd.nextLong(); |
| pojo.lat = i; |
| // make every second key null to ensure proper "null" serialization |
| Long key = (i % 2 == 0) ? null : i; |
| ctx.collect(new Tuple2<>(key, pojo)); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| } |
| }); |
| |
| KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); |
| Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); |
| producerProperties.setProperty("retries", "3"); |
| kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); |
| env.execute("Write KV to Kafka"); |
| |
| // ----------- Read the data again ------------------- |
| |
| env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(1); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| |
| KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props)); |
| fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>() { |
| long counter = 0; |
| @Override |
| public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception { |
| // the elements should be in order. |
| Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter); |
| if (value.f1.lat % 2 == 0) { |
| assertNull("key was not null", value.f0); |
| } else { |
| Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter); |
| } |
| counter++; |
| if (counter == elementCount) { |
| // we got the right number of elements |
| throw new SuccessException(); |
| } |
| } |
| }); |
| |
| tryExecute(env, "Read KV from Kafka"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| private static class PojoValue { |
| public Date when; |
| public long lon; |
| public long lat; |
| public PojoValue() {} |
| } |
| |
| /** |
| * Test delete behavior and metrics for producer. |
| * @throws Exception |
| */ |
| public void runAllDeletesTest() throws Exception { |
| final String topic = "alldeletestest"; |
| createTestTopic(topic, 1, 1); |
| final int elementCount = 300; |
| |
| // ----------- Write some data into Kafka ------------------- |
| |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(1); |
| env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| |
| DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() { |
| @Override |
| public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception { |
| Random rnd = new Random(1337); |
| for (long i = 0; i < elementCount; i++) { |
| final byte[] key = new byte[200]; |
| rnd.nextBytes(key); |
| ctx.collect(new Tuple2<>(key, (PojoValue) null)); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| } |
| }); |
| |
| TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig()); |
| |
| Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); |
| producerProperties.setProperty("retries", "3"); |
| producerProperties.putAll(secureProps); |
| kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); |
| |
| env.execute("Write deletes to Kafka"); |
| |
| // ----------- Read the data again ------------------- |
| |
| env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setParallelism(1); |
| env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props)); |
| |
| fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() { |
| long counter = 0; |
| @Override |
| public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception { |
| // ensure that deleted messages are passed as nulls |
| assertNull(value.f1); |
| counter++; |
| if (counter == elementCount) { |
| // we got the right number of elements |
| throw new SuccessException(); |
| } |
| } |
| }); |
| |
| tryExecute(env, "Read deletes from Kafka"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated. |
| * |
| * @throws Exception |
| */ |
| public void runEndOfStreamTest() throws Exception { |
| |
| final int elementCount = 300; |
| final String topic = writeSequence("testEndOfStream", elementCount, 1, 1); |
| |
| // read using custom schema |
| final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env1.setParallelism(1); |
| env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env1.getConfig().disableSysoutLogging(); |
| |
| Properties props = new Properties(); |
| props.putAll(standardProps); |
| props.putAll(secureProps); |
| |
| DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(elementCount), props)); |
| fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { |
| @Override |
| public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { |
| // noop ;) |
| } |
| }); |
| |
| tryExecute(env1, "Consume " + elementCount + " elements from Kafka"); |
| |
| deleteTestTopic(topic); |
| } |
| |
| /** |
| * Test metrics reporting for consumer. |
| * |
| * @throws Exception |
| */ |
| public void runMetricsTest() throws Throwable { |
| |
| // create a stream with 5 topics |
| final String topic = "metricsStream"; |
| createTestTopic(topic, 5, 1); |
| |
| final Tuple1<Throwable> error = new Tuple1<>(null); |
| |
| // start job writing & reading data. |
| final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env1.setParallelism(1); |
| env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| env1.getConfig().disableSysoutLogging(); |
| env1.disableOperatorChaining(); // let the source read everything into the network buffers |
| |
| TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>( |
| TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), env1.getConfig()); |
| |
| DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); |
| fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { |
| @Override |
| public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op |
| } |
| }); |
| |
| DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { |
| boolean running = true; |
| |
| @Override |
| public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { |
| int i = 0; |
| while (running) { |
| ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); |
| Thread.sleep(1); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }); |
| |
| kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null); |
| |
| JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph()); |
| final JobID jobId = jobGraph.getJobID(); |
| |
| Runnable job = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| client.setDetached(false); |
| client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader(), false); |
| } catch (Throwable t) { |
| if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { |
| LOG.warn("Got exception during execution", t); |
| error.f0 = t; |
| } |
| } |
| } |
| }; |
| Thread jobThread = new Thread(job); |
| jobThread.start(); |
| |
| try { |
| // connect to JMX |
| MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); |
| // wait until we've found all 5 offset metrics |
| Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); |
| while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working |
| if (error.f0 != null) { |
| // fail test early |
| throw error.f0; |
| } |
| offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); |
| Thread.sleep(50); |
| } |
| Assert.assertEquals(5, offsetMetrics.size()); |
| // we can't rely on the consumer to have touched all the partitions already |
| // that's why we'll wait until all five partitions have a positive offset. |
| // The test will fail if we never meet the condition |
| while (true) { |
| int numPosOffsets = 0; |
| // check that offsets are correctly reported |
| for (ObjectName object : offsetMetrics) { |
| Object offset = mBeanServer.getAttribute(object, "Value"); |
| if ((long) offset >= 0) { |
| numPosOffsets++; |
| } |
| } |
| if (numPosOffsets == 5) { |
| break; |
| } |
| // wait for the consumer to consume on all partitions |
| Thread.sleep(50); |
| } |
| |
| // check if producer metrics are also available. |
| Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null); |
| Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30); |
| |
| LOG.info("Found all JMX metrics. Cancelling job."); |
| } finally { |
| // cancel |
| client.cancel(jobId); |
| // wait for the job to finish (it should due to the cancel command above) |
| jobThread.join(); |
| } |
| |
| if (error.f0 != null) { |
| throw error.f0; |
| } |
| |
| deleteTestTopic(topic); |
| } |
| |
| private static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> { |
| |
| final int finalCount; |
| int count = 0; |
| |
| TypeInformation<Tuple2<Integer, Integer>> ti = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}); |
| TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig()); |
| |
| public FixedNumberDeserializationSchema(int finalCount) { |
| this.finalCount = finalCount; |
| } |
| |
| @Override |
| public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException { |
| DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); |
| return ser.deserialize(in); |
| } |
| |
| @Override |
| public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) { |
| return ++count >= finalCount; |
| } |
| |
| @Override |
| public TypeInformation<Tuple2<Integer, Integer>> getProducedType() { |
| return ti; |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Reading writing test data sets |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Runs a job using the provided environment to read a sequence of records from a single Kafka topic. |
| * The method allows to individually specify the expected starting offset and total read value count of each partition. |
| * The job will be considered successful only if all partition read results match the start offset and value count criteria. |
| */ |
| protected void readSequence(final StreamExecutionEnvironment env, |
| final StartupMode startupMode, |
| final Map<KafkaTopicPartition, Long> specificStartupOffsets, |
| final Long startupTimestamp, |
| final Properties cc, |
| final String topicName, |
| final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception { |
| final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size(); |
| |
| int finalCountTmp = 0; |
| for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) { |
| finalCountTmp += valuesCountAndStartOffset.getValue().f0; |
| } |
| final int finalCount = finalCountTmp; |
| |
| final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = |
| TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}); |
| |
| final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = |
| new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); |
| |
| // create the consumer |
| cc.putAll(secureProps); |
| FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); |
| setKafkaConsumerOffset(startupMode, consumer, specificStartupOffsets, startupTimestamp); |
| |
| DataStream<Tuple2<Integer, Integer>> source = env |
| .addSource(consumer).setParallelism(sourceParallelism) |
| .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism); |
| |
| // verify data |
| source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { |
| |
| private HashMap<Integer, BitSet> partitionsToValueCheck; |
| private int count = 0; |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| partitionsToValueCheck = new HashMap<>(); |
| for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) { |
| partitionsToValueCheck.put(partition, new BitSet()); |
| } |
| } |
| |
| @Override |
| public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception { |
| int partition = value.f0; |
| int val = value.f1; |
| |
| BitSet bitSet = partitionsToValueCheck.get(partition); |
| if (bitSet == null) { |
| throw new RuntimeException("Got a record from an unknown partition"); |
| } else { |
| bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1); |
| } |
| |
| count++; |
| |
| LOG.info("Received message {}, total {} messages", value, count); |
| |
| // verify if we've seen everything |
| if (count == finalCount) { |
| for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) { |
| BitSet check = partitionsToValueCheck.getValue(); |
| int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0; |
| |
| if (check.cardinality() != expectedValueCount) { |
| throw new RuntimeException("Expected cardinality to be " + expectedValueCount + |
| ", but was " + check.cardinality()); |
| } else if (check.nextClearBit(0) != expectedValueCount) { |
| throw new RuntimeException("Expected next clear bit to be " + expectedValueCount + |
| ", but was " + check.cardinality()); |
| } |
| } |
| |
| // test has passed |
| throw new SuccessException(); |
| } |
| } |
| |
| }).setParallelism(1); |
| |
| tryExecute(env, "Read data from Kafka"); |
| |
| LOG.info("Successfully read sequence for verification"); |
| } |
| |
| /** |
| * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Long, Properties, String, Map)} to |
| * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. |
| */ |
| protected void readSequence(final StreamExecutionEnvironment env, |
| final StartupMode startupMode, |
| final Map<KafkaTopicPartition, Long> specificStartupOffsets, |
| final Long startupTimestamp, |
| final Properties cc, |
| final int sourceParallelism, |
| final String topicName, |
| final int valuesCount, |
| final int startFrom) throws Exception { |
| HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); |
| for (int i = 0; i < sourceParallelism; i++) { |
| partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); |
| } |
| readSequence(env, startupMode, specificStartupOffsets, startupTimestamp, cc, topicName, partitionsToValuesCountAndStartOffset); |
| } |
| |
| protected void setKafkaConsumerOffset(final StartupMode startupMode, |
| final FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer, |
| final Map<KafkaTopicPartition, Long> specificStartupOffsets, |
| final Long startupTimestamp) { |
| switch (startupMode) { |
| case EARLIEST: |
| consumer.setStartFromEarliest(); |
| break; |
| case LATEST: |
| consumer.setStartFromLatest(); |
| break; |
| case SPECIFIC_OFFSETS: |
| consumer.setStartFromSpecificOffsets(specificStartupOffsets); |
| break; |
| case GROUP_OFFSETS: |
| consumer.setStartFromGroupOffsets(); |
| break; |
| case TIMESTAMP: |
| consumer.setStartFromTimestamp(startupTimestamp); |
| break; |
| } |
| } |
| |
| protected String writeSequence( |
| String baseTopicName, |
| final int numElements, |
| final int parallelism, |
| final int replicationFactor) throws Exception { |
| LOG.info("\n===================================\n" + |
| "== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" + |
| "==================================="); |
| |
| final TypeInformation<Tuple2<Integer, Integer>> resultType = |
| TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); |
| |
| final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = |
| new KeyedSerializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = |
| new KeyedDeserializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| final int maxNumAttempts = 10; |
| |
| for (int attempt = 1; attempt <= maxNumAttempts; attempt++) { |
| |
| final String topicName = baseTopicName + '-' + attempt; |
| |
| LOG.info("Writing attempt #" + attempt); |
| |
| // -------- Write the Sequence -------- |
| |
| createTestTopic(topicName, parallelism, replicationFactor); |
| |
| StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| writeEnv.getConfig().disableSysoutLogging(); |
| |
| DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { |
| |
| private boolean running = true; |
| |
| @Override |
| public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { |
| int cnt = 0; |
| int partition = getRuntimeContext().getIndexOfThisSubtask(); |
| |
| while (running && cnt < numElements) { |
| ctx.collect(new Tuple2<>(partition, cnt)); |
| cnt++; |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }).setParallelism(parallelism); |
| |
| // the producer must not produce duplicates |
| Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); |
| producerProperties.setProperty("retries", "0"); |
| producerProperties.putAll(secureProps); |
| |
| kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism)) |
| .setParallelism(parallelism); |
| |
| try { |
| writeEnv.execute("Write sequence"); |
| } |
| catch (Exception e) { |
| LOG.error("Write attempt failed, trying again", e); |
| deleteTestTopic(topicName); |
| waitUntilNoJobIsRunning(client); |
| continue; |
| } |
| |
| LOG.info("Finished writing sequence"); |
| |
| // -------- Validate the Sequence -------- |
| |
| // we need to validate the sequence, because kafka's producers are not exactly once |
| LOG.info("Validating sequence"); |
| |
| waitUntilNoJobIsRunning(client); |
| |
| if (validateSequence(topicName, parallelism, deserSchema, numElements)) { |
| // everything is good! |
| return topicName; |
| } |
| else { |
| deleteTestTopic(topicName); |
| // fall through the loop |
| } |
| } |
| |
| throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts"); |
| } |
| |
| protected void writeAppendSequence( |
| String topicName, |
| final int originalNumElements, |
| final int numElementsToAppend, |
| final int parallelism) throws Exception { |
| |
| LOG.info("\n===================================\n" + |
| "== Appending sequence of " + numElementsToAppend + " into " + topicName + |
| "==================================="); |
| |
| final TypeInformation<Tuple2<Integer, Integer>> resultType = |
| TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); |
| |
| final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = |
| new KeyedSerializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = |
| new KeyedDeserializationSchemaWrapper<>( |
| new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); |
| |
| // -------- Write the append sequence -------- |
| |
| StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| writeEnv.getConfig().disableSysoutLogging(); |
| |
| DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { |
| |
| private boolean running = true; |
| |
| @Override |
| public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { |
| int cnt = originalNumElements; |
| int partition = getRuntimeContext().getIndexOfThisSubtask(); |
| |
| while (running && cnt < numElementsToAppend + originalNumElements) { |
| ctx.collect(new Tuple2<>(partition, cnt)); |
| cnt++; |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| }).setParallelism(parallelism); |
| |
| // the producer must not produce duplicates |
| Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); |
| producerProperties.setProperty("retries", "0"); |
| producerProperties.putAll(secureProps); |
| |
| kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism)) |
| .setParallelism(parallelism); |
| |
| try { |
| writeEnv.execute("Write sequence"); |
| } |
| catch (Exception e) { |
| throw new Exception("Failed to append sequence to Kafka; append job failed.", e); |
| } |
| |
| LOG.info("Finished writing append sequence"); |
| |
| // we need to validate the sequence, because kafka's producers are not exactly once |
| LOG.info("Validating sequence"); |
| while (!getRunningJobs(client).isEmpty()){ |
| Thread.sleep(50); |
| } |
| |
| if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) { |
| throw new Exception("Could not append a valid sequence to Kafka."); |
| } |
| } |
| |
| private boolean validateSequence( |
| final String topic, |
| final int parallelism, |
| KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema, |
| final int totalNumElements) throws Exception { |
| |
| final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); |
| readEnv.getConfig().disableSysoutLogging(); |
| readEnv.setParallelism(parallelism); |
| |
| Properties readProps = (Properties) standardProps.clone(); |
| readProps.setProperty("group.id", "flink-tests-validator"); |
| readProps.putAll(secureProps); |
| FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topic, deserSchema, readProps); |
| consumer.setStartFromEarliest(); |
| |
| readEnv |
| .addSource(consumer) |
| .map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { |
| |
| private final int totalCount = parallelism * totalNumElements; |
| private int count = 0; |
| |
| @Override |
| public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { |
| if (++count == totalCount) { |
| throw new SuccessException(); |
| } else { |
| return value; |
| } |
| } |
| }).setParallelism(1) |
| .addSink(new DiscardingSink<>()).setParallelism(1); |
| |
| final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
| |
| JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph()); |
| final JobID jobId = jobGraph.getJobID(); |
| |
| Thread runner = new Thread() { |
| @Override |
| public void run() { |
| try { |
| client.setDetached(false); |
| client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader(), false); |
| tryExecute(readEnv, "sequence validation"); |
| } catch (Throwable t) { |
| if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) { |
| errorRef.set(t); |
| } |
| } |
| } |
| }; |
| runner.start(); |
| |
| final long deadline = System.nanoTime() + 10_000_000_000L; |
| long delay; |
| while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) { |
| runner.join(delay / 1_000_000L); |
| } |
| |
| boolean success; |
| |
| if (runner.isAlive()) { |
| // did not finish in time, maybe the producer dropped one or more records and |
| // the validation did not reach the exit point |
| success = false; |
| client.cancel(jobId); |
| } |
| else { |
| Throwable error = errorRef.get(); |
| if (error != null) { |
| success = false; |
| LOG.info("Sequence validation job failed with exception", error); |
| } |
| else { |
| success = true; |
| } |
| } |
| |
| waitUntilNoJobIsRunning(client); |
| |
| return success; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Debugging utilities |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Read topic to list, only using Kafka code. |
| */ |
| private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { |
| ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); |
| // we request only one stream per consumer instance. Kafka will make sure that each consumer group |
| // will see each message only once. |
| Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1); |
| Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); |
| if (streams.size() != 1) { |
| throw new RuntimeException("Expected only one message stream but got " + streams.size()); |
| } |
| List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); |
| if (kafkaStreams == null) { |
| throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString()); |
| } |
| if (kafkaStreams.size() != 1) { |
| throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams"); |
| } |
| LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); |
| ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); |
| |
| List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); |
| int read = 0; |
| while (iteratorToRead.hasNext()) { |
| read++; |
| result.add(iteratorToRead.next()); |
| if (read == stopAfter) { |
| LOG.info("Read " + read + " elements"); |
| return result; |
| } |
| } |
| return result; |
| } |
| |
| private static class BrokerKillingMapper<T> extends RichMapFunction<T, T> |
| implements ListCheckpointed<Integer>, CheckpointListener { |
| |
| private static final long serialVersionUID = 6334389850158707313L; |
| |
| public static volatile boolean killedLeaderBefore; |
| public static volatile boolean hasBeenCheckpointedBeforeFailure; |
| |
| private final int shutdownBrokerId; |
| private final int failCount; |
| private int numElementsTotal; |
| |
| private boolean failer; |
| private boolean hasBeenCheckpointed; |
| |
| public BrokerKillingMapper(int shutdownBrokerId, int failCount) { |
| this.shutdownBrokerId = shutdownBrokerId; |
| this.failCount = failCount; |
| } |
| |
| @Override |
| public void open(Configuration parameters) { |
| failer = getRuntimeContext().getIndexOfThisSubtask() == 0; |
| } |
| |
| @Override |
| public T map(T value) throws Exception { |
| numElementsTotal++; |
| |
| if (!killedLeaderBefore) { |
| Thread.sleep(10); |
| |
| if (failer && numElementsTotal >= failCount) { |
| // shut down a Kafka broker |
| KafkaServer toShutDown = null; |
| for (KafkaServer server : kafkaServer.getBrokers()) { |
| |
| if (kafkaServer.getBrokerId(server) == shutdownBrokerId) { |
| toShutDown = server; |
| break; |
| } |
| } |
| |
| if (toShutDown == null) { |
| StringBuilder listOfBrokers = new StringBuilder(); |
| for (KafkaServer server : kafkaServer.getBrokers()) { |
| listOfBrokers.append(kafkaServer.getBrokerId(server)); |
| listOfBrokers.append(" ; "); |
| } |
| |
| throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId |
| + " ; available brokers: " + listOfBrokers.toString()); |
| } |
| else { |
| hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; |
| killedLeaderBefore = true; |
| toShutDown.shutdown(); |
| } |
| } |
| } |
| return value; |
| } |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) { |
| hasBeenCheckpointed = true; |
| } |
| |
| @Override |
| public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { |
| return Collections.singletonList(this.numElementsTotal); |
| } |
| |
| @Override |
| public void restoreState(List<Integer> state) throws Exception { |
| if (state.isEmpty() || state.size() > 1) { |
| throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); |
| } |
| this.numElementsTotal = state.get(0); |
| } |
| } |
| |
| private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, |
| KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { |
| |
| private final TypeSerializer<Tuple2<Integer, Integer>> ts; |
| |
| public Tuple2WithTopicSchema(ExecutionConfig ec) { |
| ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec); |
| } |
| |
| @Override |
| public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { |
| DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); |
| Tuple2<Integer, Integer> t2 = ts.deserialize(in); |
| return new Tuple3<>(t2.f0, t2.f1, topic); |
| } |
| |
| @Override |
| public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { |
| return false; |
| } |
| |
| @Override |
| public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { |
| return TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){}); |
| } |
| |
| @Override |
| public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { |
| return null; |
| } |
| |
| @Override |
| public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { |
| ByteArrayOutputStream by = new ByteArrayOutputStream(); |
| DataOutputView out = new DataOutputViewStreamWrapper(by); |
| try { |
| ts.serialize(new Tuple2<>(element.f0, element.f1), out); |
| } catch (IOException e) { |
| throw new RuntimeException("Error" , e); |
| } |
| return by.toByteArray(); |
| } |
| |
| @Override |
| public String getTargetTopic(Tuple3<Integer, Integer, String> element) { |
| return element.f2; |
| } |
| } |
| } |