| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.integration; |
| |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
| import org.apache.kafka.common.serialization.Serde; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.serialization.StringDeserializer; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.kafka.server.util.MockTime; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsBuilder; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.Topology; |
| import org.apache.kafka.streams.TopologyWrapper; |
| import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; |
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
| import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.Produced; |
| import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; |
| import org.apache.kafka.streams.state.KeyValueStore; |
| import org.apache.kafka.streams.state.StoreBuilder; |
| import org.apache.kafka.test.IntegrationTest; |
| import org.apache.kafka.test.MockApiProcessorSupplier; |
| import org.apache.kafka.test.MockKeyValueStoreBuilder; |
| import org.apache.kafka.test.StreamsTestUtils; |
| import org.apache.kafka.test.TestCondition; |
| import org.apache.kafka.test.TestUtils; |
| import org.junit.experimental.categories.Category; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.TestInfo; |
| import org.junit.jupiter.api.Timeout; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.regex.Pattern; |
| |
| import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.greaterThan; |
| |
| /** |
| * End-to-end integration test based on using regex and named topics for creating sources, using |
| * an embedded Kafka cluster. |
| */ |
| @Timeout(600) |
| @Category({IntegrationTest.class}) |
| public class RegexSourceIntegrationTest { |
| private static final int NUM_BROKERS = 1; |
| public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); |
| |
| @BeforeAll |
| public static void startCluster() throws IOException, InterruptedException { |
| CLUSTER.start(); |
| CLUSTER.createTopics( |
| TOPIC_1, |
| TOPIC_2, |
| TOPIC_A, |
| TOPIC_C, |
| TOPIC_Y, |
| TOPIC_Z, |
| FA_TOPIC, |
| FOO_TOPIC); |
| CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1); |
| CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); |
| } |
| |
| @AfterAll |
| public static void closeCluster() { |
| CLUSTER.stop(); |
| } |
| |
| private final MockTime mockTime = CLUSTER.time; |
| |
| private static final String TOPIC_1 = "topic-1"; |
| private static final String TOPIC_2 = "topic-2"; |
| private static final String TOPIC_A = "topic-A"; |
| private static final String TOPIC_C = "topic-C"; |
| private static final String TOPIC_Y = "topic-Y"; |
| private static final String TOPIC_Z = "topic-Z"; |
| private static final String FA_TOPIC = "fa"; |
| private static final String FOO_TOPIC = "foo"; |
| private static final String PARTITIONED_TOPIC_1 = "partitioned-1"; |
| private static final String PARTITIONED_TOPIC_2 = "partitioned-2"; |
| |
| private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); |
| private Properties streamsConfiguration; |
| private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated"; |
| private KafkaStreams streams; |
| private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0); |
| private String outputTopic; |
| |
| @BeforeEach |
| public void setUp(final TestInfo testInfo) throws InterruptedException { |
| outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); |
| final Properties properties = new Properties(); |
| properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); |
| properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); |
| properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); |
| properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L); |
| properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); |
| |
| streamsConfiguration = StreamsTestUtils.getStreamsConfig( |
| IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, testInfo), |
| CLUSTER.bootstrapServers(), |
| STRING_SERDE_CLASSNAME, |
| STRING_SERDE_CLASSNAME, |
| properties |
| ); |
| } |
| |
| @AfterEach |
| public void tearDown() throws IOException { |
| if (streams != null) { |
| streams.close(); |
| } |
| // Remove any state from previous test runs |
| IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); |
| } |
| |
| @Test |
| public void testRegexMatchesTopicsAWhenCreated() throws Exception { |
| try { |
| final Serde<String> stringSerde = Serdes.String(); |
| |
| final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); |
| // we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed |
| // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always |
| // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test |
| final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); |
| |
| CLUSTER.createTopic("TEST-TOPIC-1"); |
| |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); |
| |
| pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| final List<String> assignedTopics = new CopyOnWriteArrayList<>(); |
| streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { |
| @Override |
| public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { |
| return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { |
| @Override |
| public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { |
| super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); |
| } |
| }; |
| |
| } |
| }); |
| |
| streams.start(); |
| TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); |
| |
| CLUSTER.createTopic("TEST-TOPIC-2"); |
| |
| TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); |
| |
| streams.close(); |
| } finally { |
| CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); |
| } |
| } |
| |
| @Test |
| public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception { |
| final String topic1 = "TEST-TOPIC-1"; |
| final String topic2 = "TEST-TOPIC-2"; |
| |
| try { |
| CLUSTER.createTopic(topic1); |
| |
| final StreamsBuilder builder = new StreamsBuilder(); |
| final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); |
| final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match")); |
| |
| pattern1Stream |
| .selectKey((k, v) -> k) |
| .groupByKey() |
| .aggregate(() -> "", (k, v, a) -> v) |
| .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); |
| |
| final Topology topology = builder.build(); |
| assertThat(topology.describe().subtopologies().size(), greaterThan(1)); |
| streams = new KafkaStreams(topology, streamsConfiguration); |
| |
| startApplicationAndWaitUntilRunning(streams); |
| |
| CLUSTER.createTopic(topic2); |
| |
| final KeyValue<String, String> record1 = new KeyValue<>("1", "1"); |
| final KeyValue<String, String> record2 = new KeyValue<>("2", "2"); |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| topic1, |
| Collections.singletonList(record1), |
| TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), |
| CLUSTER.time |
| ); |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| topic2, |
| Collections.singletonList(record2), |
| TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), |
| CLUSTER.time |
| ); |
| IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( |
| TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), |
| outputTopic, |
| Arrays.asList(record1, record2) |
| ); |
| |
| streams.close(); |
| } finally { |
| CLUSTER.deleteTopicsAndWait(topic1, topic2); |
| } |
| } |
| |
| private String createTopic(final int suffix) throws InterruptedException { |
| final String outputTopic = "outputTopic_" + suffix; |
| CLUSTER.createTopic(outputTopic); |
| return outputTopic; |
| } |
| |
| @Test |
| public void testRegexMatchesTopicsAWhenDeleted() throws Exception { |
| final Serde<String> stringSerde = Serdes.String(); |
| final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); |
| final List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B"); |
| final List<String> assignedTopics = new CopyOnWriteArrayList<>(); |
| |
| try { |
| CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B"); |
| |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); |
| |
| pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| |
| streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { |
| @Override |
| public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { |
| return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { |
| @Override |
| public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { |
| super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); |
| } |
| }; |
| } |
| }); |
| |
| |
| streams.start(); |
| TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); |
| } finally { |
| CLUSTER.deleteTopic("TEST-TOPIC-A"); |
| } |
| |
| TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); |
| } |
| |
| @Test |
| public void shouldAddStateStoreToRegexDefinedSource() throws Exception { |
| final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false); |
| final long thirtySecondTimeout = 30 * 1000; |
| |
| final TopologyWrapper topology = new TopologyWrapper(); |
| topology.addSource("ingest", Pattern.compile("topic-\\d+")); |
| topology.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest"); |
| topology.addStateStore(storeBuilder, "my-processor"); |
| |
| streams = new KafkaStreams(topology, streamsConfiguration); |
| streams.start(); |
| |
| final TestCondition stateStoreNameBoundToSourceTopic = () -> { |
| final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToFullSourceTopicNames(); |
| final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore"); |
| return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1"); |
| }; |
| |
| TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]"); |
| } |
| |
| @Test |
| public void testShouldReadFromRegexAndNamedTopics() throws Exception { |
| final String topic1TestMessage = "topic-1 test"; |
| final String topic2TestMessage = "topic-2 test"; |
| final String topicATestMessage = "topic-A test"; |
| final String topicCTestMessage = "topic-C test"; |
| final String topicYTestMessage = "topic-Y test"; |
| final String topicZTestMessage = "topic-Z test"; |
| |
| |
| final Serde<String> stringSerde = Serdes.String(); |
| |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d")); |
| final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]")); |
| final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z)); |
| |
| pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| namedTopicsStream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| |
| streams = new KafkaStreams(builder.build(), streamsConfiguration); |
| streams.start(); |
| |
| final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); |
| |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton(topic1TestMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton(topic2TestMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton(topicATestMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton(topicCTestMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton(topicYTestMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton(topicZTestMessage), producerConfig, mockTime); |
| |
| final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); |
| |
| final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); |
| final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, 6); |
| final List<String> actualValues = new ArrayList<>(6); |
| |
| for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) { |
| actualValues.add(receivedKeyValue.value); |
| } |
| |
| Collections.sort(actualValues); |
| Collections.sort(expectedReceivedValues); |
| assertThat(actualValues, equalTo(expectedReceivedValues)); |
| } |
| |
| @Test |
| public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception { |
| KafkaStreams partitionedStreamsLeader = null; |
| KafkaStreams partitionedStreamsFollower = null; |
| try { |
| final Serde<String> stringSerde = Serdes.String(); |
| final StreamsBuilder builderLeader = new StreamsBuilder(); |
| final StreamsBuilder builderFollower = new StreamsBuilder(); |
| final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2); |
| |
| final KStream<String, String> partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d")); |
| final KStream<String, String> partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d")); |
| |
| |
| partitionedStreamLeader.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| partitionedStreamFollower.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| |
| final List<String> leaderAssignment = new CopyOnWriteArrayList<>(); |
| final List<String> followerAssignment = new CopyOnWriteArrayList<>(); |
| |
| partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { |
| @Override |
| public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { |
| return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { |
| @Override |
| public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { |
| super.subscribe(topics, new TheConsumerRebalanceListener(leaderAssignment, listener)); |
| } |
| }; |
| |
| } |
| }); |
| partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { |
| @Override |
| public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { |
| return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { |
| @Override |
| public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { |
| super.subscribe(topics, new TheConsumerRebalanceListener(followerAssignment, listener)); |
| } |
| }; |
| |
| } |
| }); |
| |
| partitionedStreamsLeader.start(); |
| partitionedStreamsFollower.start(); |
| TestUtils.waitForCondition(() -> followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment), "topic assignment not completed"); |
| } finally { |
| if (partitionedStreamsLeader != null) { |
| partitionedStreamsLeader.close(); |
| } |
| if (partitionedStreamsFollower != null) { |
| partitionedStreamsFollower.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { |
| final String fMessage = "fMessage"; |
| final String fooMessage = "fooMessage"; |
| final Serde<String> stringSerde = Serdes.String(); |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| // overlapping patterns here, no messages should be sent as TopologyException |
| // will be thrown when the processor topology is built. |
| final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*")); |
| final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*")); |
| |
| pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); |
| |
| final AtomicBoolean expectError = new AtomicBoolean(false); |
| |
| streams = new KafkaStreams(builder.build(), streamsConfiguration); |
| streams.setStateListener((newState, oldState) -> { |
| if (newState == KafkaStreams.State.ERROR) { |
| expectError.set(true); |
| } |
| }); |
| streams.setUncaughtExceptionHandler(e -> { |
| expectError.set(true); |
| return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; |
| }); |
| streams.start(); |
| |
| final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); |
| |
| IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton(fMessage), producerConfig, mockTime); |
| IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton(fooMessage), producerConfig, mockTime); |
| |
| final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); |
| try { |
| IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, 2, 5000); |
| throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this."); |
| } catch (final AssertionError e) { |
| // this is fine |
| } |
| |
| assertThat(expectError.get(), is(true)); |
| } |
| |
| private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener { |
| private final List<String> assignedTopics; |
| private final ConsumerRebalanceListener listener; |
| |
| TheConsumerRebalanceListener(final List<String> assignedTopics, final ConsumerRebalanceListener listener) { |
| this.assignedTopics = assignedTopics; |
| this.listener = listener; |
| } |
| |
| @Override |
| public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { |
| for (final TopicPartition partition : partitions) { |
| assignedTopics.remove(partition.topic()); |
| } |
| listener.onPartitionsRevoked(partitions); |
| } |
| |
| @Override |
| public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { |
| for (final TopicPartition partition : partitions) { |
| assignedTopics.add(partition.topic()); |
| } |
| Collections.sort(assignedTopics); |
| listener.onPartitionsAssigned(partitions); |
| } |
| } |
| |
| } |