| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.integration; |
| |
| import kafka.admin.AdminClient; |
| import kafka.tools.StreamsResetter; |
| import kafka.utils.MockTime; |
| import kafka.utils.ZkUtils; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException; |
| import org.apache.kafka.common.security.JaasUtils; |
| import org.apache.kafka.common.serialization.LongDeserializer; |
| import org.apache.kafka.common.serialization.LongSerializer; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
| import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KStreamBuilder; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.TimeWindows; |
| import org.apache.kafka.streams.kstream.Windowed; |
| import org.apache.kafka.test.TestCondition; |
| import org.apache.kafka.test.TestUtils; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| |
| |
| /** |
| * Tests local state store and global application cleanup. |
| */ |
| public class ResetIntegrationTest { |
| private static final int NUM_BROKERS = 1; |
| @ClassRule |
| public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); |
| private final MockTime mockTime = CLUSTER.time; |
| |
| private static final String APP_ID = "cleanup-integration-test"; |
| private static final String INPUT_TOPIC = "inputTopic"; |
| private static final String OUTPUT_TOPIC = "outputTopic"; |
| private static final String OUTPUT_TOPIC_2 = "outputTopic2"; |
| private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; |
| private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; |
| |
| private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; |
| private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; |
| private static final int TIMEOUT_MULTIPLYER = 5; |
| |
| private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); |
| private static int testNo = 0; |
| private static AdminClient adminClient = null; |
| |
| @BeforeClass |
| public static void startKafkaCluster() throws Exception { |
| CLUSTER.createTopic(INPUT_TOPIC); |
| CLUSTER.createTopic(OUTPUT_TOPIC); |
| CLUSTER.createTopic(OUTPUT_TOPIC_2); |
| CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN); |
| } |
| |
| @AfterClass |
| public static void globalCleanup() { |
| if (adminClient != null) { |
| adminClient.close(); |
| adminClient = null; |
| } |
| } |
| |
| @Before |
| public void cleanup() throws Exception { |
| ++testNo; |
| |
| if (adminClient == null) { |
| adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); |
| } |
| |
| // busy wait until cluster (ie, ConsumerGroupCoordinator) is available |
| while (true) { |
| Thread.sleep(50); |
| |
| try { |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, |
| "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); |
| } catch (GroupCoordinatorNotAvailableException e) { |
| continue; |
| } catch (IllegalArgumentException e) { |
| continue; |
| } |
| break; |
| } |
| |
| if (testNo == 1) { |
| prepareInputData(); |
| } |
| } |
| |
| @Test |
| public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { |
| CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); |
| |
| final Properties streamsConfiguration = prepareTest(); |
| final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( |
| CLUSTER.bootstrapServers(), |
| APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, |
| LongDeserializer.class, |
| LongDeserializer.class); |
| |
| // RUN |
| KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration); |
| streams.start(); |
| final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC, |
| 10, |
| 60000); |
| // receive only first values to make sure intermediate user topic is not consumed completely |
| // => required to test "seekToEnd" for intermediate topics |
| final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC_2, |
| 1 |
| ).get(0); |
| |
| streams.close(); |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, |
| "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); |
| |
| // RESET |
| streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); |
| streams.cleanUp(); |
| cleanGlobal(INTERMEDIATE_USER_TOPIC); |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, |
| "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); |
| |
| assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); |
| |
| // RE-RUN |
| streams.start(); |
| final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC, |
| 10, |
| 60000); |
| final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC_2_RERUN, |
| 1 |
| ).get(0); |
| streams.close(); |
| |
| assertThat(resultRerun, equalTo(result)); |
| assertThat(resultRerun2, equalTo(result2)); |
| |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, |
| "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); |
| cleanGlobal(INTERMEDIATE_USER_TOPIC); |
| |
| CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC); |
| Set<String> allTopics; |
| ZkUtils zkUtils = null; |
| try { |
| zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), |
| 30000, |
| 30000, |
| JaasUtils.isZkSecurityEnabled()); |
| |
| do { |
| Utils.sleep(100); |
| allTopics = new HashSet<>(); |
| allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); |
| } while (allTopics.contains(INTERMEDIATE_USER_TOPIC)); |
| } finally { |
| if (zkUtils != null) { |
| zkUtils.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { |
| final Properties streamsConfiguration = prepareTest(); |
| final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( |
| CLUSTER.bootstrapServers(), |
| APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, |
| LongDeserializer.class, |
| LongDeserializer.class); |
| |
| // RUN |
| KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); |
| streams.start(); |
| final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC, |
| 10, |
| 60000); |
| |
| streams.close(); |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, |
| "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); |
| |
| // RESET |
| streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); |
| streams.cleanUp(); |
| cleanGlobal(null); |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, |
| "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); |
| |
| assertInternalTopicsGotDeleted(null); |
| |
| // RE-RUN |
| streams.start(); |
| final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( |
| resultTopicConsumerConfig, |
| OUTPUT_TOPIC, |
| 10, |
| 60000); |
| streams.close(); |
| |
| assertThat(resultRerun, equalTo(result)); |
| |
| TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, |
| "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); |
| cleanGlobal(null); |
| } |
| |
| private Properties prepareTest() throws Exception { |
| final Properties streamsConfiguration = new Properties(); |
| streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); |
| streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); |
| streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); |
| streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); |
| streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); |
| streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1); |
| streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); |
| streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); |
| streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| |
| IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); |
| |
| return streamsConfiguration; |
| } |
| |
| private void prepareInputData() throws Exception { |
| final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); |
| |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(10); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(1); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(1); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(1); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); |
| mockTime.sleep(1); |
| IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); |
| } |
| |
| private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) { |
| final KStreamBuilder builder = new KStreamBuilder(); |
| |
| final KStream<Long, String> input = builder.stream(INPUT_TOPIC); |
| |
| // use map to trigger internal re-partitioning before groupByKey |
| final KTable<Long, Long> globalCounts = input |
| .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { |
| @Override |
| public KeyValue<Long, String> apply(final Long key, final String value) { |
| return new KeyValue<>(key, value); |
| } |
| }) |
| .groupByKey() |
| .count("global-count"); |
| globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); |
| |
| final KStream<Long, Long> windowedCounts = input |
| .through(INTERMEDIATE_USER_TOPIC) |
| .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { |
| private long sleep = 1000; |
| |
| @Override |
| public KeyValue<Long, String> apply(final Long key, final String value) { |
| // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped |
| // => want to test "skip over" unprocessed records |
| // increasing the sleep time only has disadvantage that test run time is increased |
| mockTime.sleep(sleep); |
| sleep *= 2; |
| return new KeyValue<>(key, value); |
| } |
| }) |
| .groupByKey() |
| .count(TimeWindows.of(35).advanceBy(10), "count") |
| .toStream() |
| .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { |
| @Override |
| public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) { |
| return new KeyValue<>(key.window().start() + key.window().end(), value); |
| } |
| }); |
| windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2); |
| |
| return builder; |
| } |
| |
| private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() { |
| final KStreamBuilder builder = new KStreamBuilder(); |
| |
| final KStream<Long, String> input = builder.stream(INPUT_TOPIC); |
| |
| // use map to trigger internal re-partitioning before groupByKey |
| input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { |
| @Override |
| public KeyValue<Long, Long> apply(final Long key, final String value) { |
| return new KeyValue<>(key, key); |
| } |
| }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); |
| |
| return builder; |
| } |
| |
| private void cleanGlobal(final String intermediateUserTopic) { |
| final String[] parameters; |
| if (intermediateUserTopic != null) { |
| parameters = new String[]{ |
| "--application-id", APP_ID + testNo, |
| "--bootstrap-server", CLUSTER.bootstrapServers(), |
| "--zookeeper", CLUSTER.zKConnectString(), |
| "--input-topics", INPUT_TOPIC, |
| "--intermediate-topics", INTERMEDIATE_USER_TOPIC |
| }; |
| } else { |
| parameters = new String[]{ |
| "--application-id", APP_ID + testNo, |
| "--bootstrap-server", CLUSTER.bootstrapServers(), |
| "--zookeeper", CLUSTER.zKConnectString(), |
| "--input-topics", INPUT_TOPIC |
| }; |
| } |
| final Properties cleanUpConfig = new Properties(); |
| cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); |
| cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); |
| |
| final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); |
| Assert.assertEquals(0, exitCode); |
| } |
| |
| private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) { |
| final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); |
| expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); |
| if (intermediateUserTopic != null) { |
| expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic); |
| } |
| expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); |
| expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); |
| expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); |
| expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); |
| |
| Set<String> allTopics; |
| ZkUtils zkUtils = null; |
| try { |
| zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), |
| 30000, |
| 30000, |
| JaasUtils.isZkSecurityEnabled()); |
| |
| do { |
| Utils.sleep(100); |
| allTopics = new HashSet<>(); |
| allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); |
| } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size()); |
| } finally { |
| if (zkUtils != null) { |
| zkUtils.close(); |
| } |
| } |
| assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); |
| } |
| |
| private class WaitUntilConsumerGroupGotClosed implements TestCondition { |
| @Override |
| public boolean conditionMet() { |
| return adminClient.describeGroup(APP_ID + testNo).members().isEmpty(); |
| } |
| } |
| |
| } |