| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.integration; |
| |
| import org.apache.kafka.clients.admin.Admin; |
| import org.apache.kafka.clients.admin.AdminClientConfig; |
| import org.apache.kafka.clients.admin.Config; |
| import org.apache.kafka.clients.admin.ConfigEntry; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.common.config.ConfigResource; |
| import org.apache.kafka.common.config.TopicConfig; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.kafka.common.utils.Bytes; |
| import org.apache.kafka.server.util.MockTime; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.StreamsBuilder; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
| import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
| import org.apache.kafka.streams.kstream.Grouped; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.Materialized; |
| import org.apache.kafka.streams.kstream.TimeWindows; |
| import org.apache.kafka.streams.processor.internals.ProcessorStateManager; |
| import org.apache.kafka.streams.state.WindowStore; |
| import org.apache.kafka.test.MockMapper; |
| import org.apache.kafka.test.TestUtils; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Tag; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.Timeout; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| |
| import static java.time.Duration.ofMillis; |
| import static java.time.Duration.ofSeconds; |
| import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; |
| import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| /** |
| * Tests related to internal topics in streams |
| */ |
| @SuppressWarnings("deprecation") |
| @Timeout(600) |
| @Tag("integration") |
| public class InternalTopicIntegrationTest { |
| public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); |
| |
| @BeforeAll |
| public static void startCluster() throws IOException, InterruptedException { |
| CLUSTER.start(); |
| CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC); |
| } |
| |
| @AfterAll |
| public static void closeCluster() { |
| CLUSTER.stop(); |
| } |
| |
| |
| private static final String APP_ID = "internal-topics-integration-test"; |
| private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; |
| private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable"; |
| |
| private final MockTime mockTime = CLUSTER.time; |
| |
| private Properties streamsProp; |
| |
| @BeforeEach |
| public void before() { |
| streamsProp = new Properties(); |
| streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
| streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
| streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); |
| streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); |
| streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); |
| streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| } |
| |
| @AfterEach |
| public void after() throws IOException { |
| // Remove any state from previous test runs |
| IntegrationTestUtils.purgeLocalStreamsState(streamsProp); |
| } |
| |
| private void produceData(final List<String> inputValues) { |
| final Properties producerProp = new Properties(); |
| producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| producerProp.put(ProducerConfig.ACKS_CONFIG, "all"); |
| producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| |
| IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, mockTime); |
| } |
| |
| private Properties getTopicProperties(final String changelog) { |
| try (final Admin adminClient = createAdminClient()) { |
| final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog); |
| try { |
| final Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get(); |
| final Properties properties = new Properties(); |
| for (final ConfigEntry configEntry : config.entries()) { |
| if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) { |
| properties.put(configEntry.name(), configEntry.value()); |
| } |
| } |
| return properties; |
| } catch (final InterruptedException | ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private Admin createAdminClient() { |
| final Properties adminClientConfig = new Properties(); |
| adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| return Admin.create(adminClientConfig); |
| } |
| |
| /* |
| * This test just ensures that the assignor does not get stuck during partition number resolution |
| * for internal repartition topics. See KAFKA-10689 |
| */ |
| @Test |
| public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception { |
| final String appID = APP_ID + "-windowed-FKJ"; |
| streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); |
| |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC); |
| final KTable<String, String> inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC); |
| inputTopic |
| .groupBy( |
| (k, v) -> k, |
| Grouped.with("GroupName", Serdes.String(), Serdes.String()) |
| ) |
| .windowedBy(TimeWindows.of(Duration.ofMinutes(10))) |
| .aggregate( |
| () -> "", |
| (k, v, a) -> a + k) |
| .leftJoin( |
| inputTable, |
| v -> v, |
| (x, y) -> x + y |
| ); |
| |
| try (final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsProp)) { |
| startApplicationAndWaitUntilRunning(streams); |
| } |
| } |
| |
| @Test |
| public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { |
| final String appID = APP_ID + "-compact"; |
| streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); |
| |
| // |
| // Step 1: Configure and start a simple word count topology |
| // |
| final StreamsBuilder builder = new StreamsBuilder(); |
| final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); |
| |
| textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) |
| .groupBy(MockMapper.selectValueMapper()) |
| .count(Materialized.as("Counts")); |
| |
| try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp)) { |
| startApplicationAndWaitUntilRunning(streams); |
| |
| // |
| // Step 2: Produce some input data to the input topic. |
| // |
| produceData(Arrays.asList("hello", "world", "world", "hello world")); |
| |
| // |
| // Step 3: Verify the state changelog topics are compact |
| // |
| waitForCompletion(streams, 2, 30000L); |
| } |
| |
| final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts", null)); |
| assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, changelogProps.getProperty(TopicConfig.CLEANUP_POLICY_CONFIG)); |
| |
| final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); |
| assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, repartitionProps.getProperty(TopicConfig.CLEANUP_POLICY_CONFIG)); |
| assertEquals(4, repartitionProps.size()); |
| } |
| |
| @Test |
| public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception { |
| final String appID = APP_ID + "-compact-delete"; |
| streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); |
| |
| // |
| // Step 1: Configure and start a simple word count topology |
| // |
| final StreamsBuilder builder = new StreamsBuilder(); |
| final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); |
| |
| final int durationMs = 2000; |
| |
| textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) |
| .groupBy(MockMapper.selectValueMapper()) |
| .windowedBy(TimeWindows.of(ofSeconds(1L)).grace(ofMillis(0L))) |
| .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountWindows").withRetention(ofSeconds(2L))); |
| |
| try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp)) { |
| startApplicationAndWaitUntilRunning(streams); |
| |
| // |
| // Step 2: Produce some input data to the input topic. |
| // |
| produceData(Arrays.asList("hello", "world", "world", "hello world")); |
| |
| // |
| // Step 3: Verify the state changelog topics are compact |
| // |
| waitForCompletion(streams, 2, 30000L); |
| } |
| |
| final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows", null)); |
| final List<String> policies = Arrays.asList(properties.getProperty(TopicConfig.CLEANUP_POLICY_CONFIG).split(",")); |
| assertEquals(2, policies.size()); |
| assertTrue(policies.contains(TopicConfig.CLEANUP_POLICY_COMPACT)); |
| assertTrue(policies.contains(TopicConfig.CLEANUP_POLICY_DELETE)); |
| // retention should be 1 day + the window duration |
| final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; |
| assertEquals(retention, Long.parseLong(properties.getProperty(TopicConfig.RETENTION_MS_CONFIG))); |
| |
| final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); |
| assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, repartitionProps.getProperty(TopicConfig.CLEANUP_POLICY_CONFIG)); |
| assertEquals(4, repartitionProps.size()); |
| } |
| } |