blob: 1a546763b7d7de7c1a675805c5abc9987cbee044 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit tests for the {@link KafkaConsumerThread}.
*/
public class KafkaConsumerThreadTest {
@Test(timeout = 10000)
public void testCloseWithoutAssignedPartitions() throws Exception {
// no initial assignment
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
new LinkedHashMap<TopicPartition, Long>(),
Collections.<TopicPartition, Long>emptyMap(),
false,
null,
null);
// setup latch so the test waits until testThread is blocked on getBatchBlocking method
final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch();
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>() {
@Override
public List<KafkaTopicPartitionState<TopicPartition>> getBatchBlocking() throws InterruptedException {
getBatchBlockingInvoked.trigger();
return super.getBatchBlocking();
}
};
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
getBatchBlockingInvoked.await();
testThread.shutdown();
testThread.join();
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer initially had no assignments
* - new unassigned partitions already have defined offsets
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception {
final String testTopic = "test-topic";
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(31L);
final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
// -------- setup mock KafkaConsumer --------
// no initial assignment
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
Collections.<TopicPartition, Long>emptyMap(),
false,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
assertEquals(
newPartition.getOffset() + 1,
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
}
assertEquals(0, unassignedPartitionsQueue.size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer initially had no assignments
* - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws Exception {
final String testTopic = "test-topic";
// -------- new partitions with undefined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
// -------- setup mock KafkaConsumer --------
// no initial assignment
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
mockRetrievedPositions,
false,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// the sentinel offset states should have been replaced with defined values according to the retrieved values
assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle()) - 1, newPartition1.getOffset());
assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle()) - 1, newPartition2.getOffset());
// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
assertEquals(
newPartition.getOffset() + 1,
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
}
assertEquals(0, unassignedPartitionsQueue.size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer already have some assignments
* - new unassigned partitions already have defined offsets
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassigningPartitionsWithDefinedOffsets() throws Exception {
final String testTopic = "test-topic";
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(29L);
List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
totalPartitions.add(oldPartition1);
totalPartitions.add(oldPartition2);
totalPartitions.add(newPartition);
// -------- setup mock KafkaConsumer --------
// has initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
Collections.<TopicPartition, Long>emptyMap(),
false,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
testThread.getCurrentPartitions().put(oldPartition.getKafkaPartitionHandle(), oldPartition);
}
testThread.start();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
// old partitions should be re-seeked to their previous positions
for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
assertEquals(
partition.getOffset() + 1,
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
}
assertEquals(0, unassignedPartitionsQueue.size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer already have some assignments
* - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassigningPartitionsWithoutDefinedOffsets() throws Exception {
final String testTopic = "test-topic";
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with undefined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
totalPartitions.add(oldPartition1);
totalPartitions.add(oldPartition2);
totalPartitions.add(newPartition);
// -------- setup mock KafkaConsumer --------
// has initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L);
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
mockRetrievedPositions,
false,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
testThread.getCurrentPartitions().put(oldPartition.getKafkaPartitionHandle(), oldPartition);
}
testThread.start();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// the sentinel offset states should have been replaced with defined values according to the retrieved positions
assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle()) - 1, newPartition.getOffset());
// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
// old partitions should be re-seeked to their previous positions
for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
assertEquals(
partition.getOffset() + 1,
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
}
assertEquals(0, unassignedPartitionsQueue.size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer already have some assignments
* - new unassigned partitions already have defined offsets
* - the consumer was woken up prior to the reassignment
*
* <p>In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test
public void testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception {
final String testTopic = "test-topic";
// -------- old partitions --------
KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
oldPartition1.setOffset(23L);
KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
oldPartition2.setOffset(32L);
List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
oldPartitions.add(oldPartition1);
oldPartitions.add(oldPartition2);
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
newPartition.setOffset(29L);
// -------- setup mock KafkaConsumer --------
// initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
}
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsToPositions,
Collections.<TopicPartition, Long>emptyMap(),
true,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
unassignedPartitionsQueue.add(newPartition);
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
// pause just before the reassignment so we can inject the wakeup
testThread.waitPartitionReassignmentInvoked();
testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
verify(mockConsumer, times(1)).wakeup();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// the consumer's assignment should have remained untouched
assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size());
for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
assertEquals(
oldPartition.getOffset() + 1,
mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue());
}
// the new partitions should have been re-added to the unassigned partitions queue
assertEquals(1, testThread.getCurrentPartitions().size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer has no initial assignments
* - new unassigned partitions have undefined offsets
* - the consumer was woken up prior to the reassignment
*
* <p>In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup() throws Exception {
final String testTopic = "test-topic";
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
// -------- setup mock KafkaConsumer --------
// no initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
mockRetrievedPositions,
true,
null,
null);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
// pause just before the reassignment so we can inject the wakeup
testThread.waitPartitionReassignmentInvoked();
testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
// make sure the consumer was actually woken up
verify(mockConsumer, times(1)).wakeup();
testThread.startPartitionReassignment();
testThread.waitPartitionReassignmentComplete();
// the consumer's assignment should have remained untouched (in this case, empty)
assertEquals(0, mockConsumerAssignmentsAndPositions.size());
// the new partitions should have been re-added to the unassigned partitions queue
assertEquals(2, testThread.getCurrentPartitions().size());
}
/**
* Tests reassignment works correctly in the case when:
* - the consumer has no initial assignments
* - new unassigned partitions have undefined offsets
* - the consumer was woken up during the reassignment
*
* <p>In this case, reassignment should have completed, and the consumer is restored the wakeup call after the reassignment.
*
* <p>Setting a timeout because the test will not finish if there is logic error with
* the reassignment flow.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway() throws Exception {
final String testTopic = "test-topic";
// -------- new partitions with defined offsets --------
KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
newPartitions.add(newPartition1);
newPartitions.add(newPartition2);
// -------- setup mock KafkaConsumer --------
// no initial assignments
final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
// these latches are used to pause midway the reassignment process
final OneShotLatch midAssignmentLatch = new OneShotLatch();
final OneShotLatch continueAssigmentLatch = new OneShotLatch();
final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
mockConsumerAssignmentsAndPositions,
mockRetrievedPositions,
false,
midAssignmentLatch,
continueAssigmentLatch);
// -------- setup new partitions to be polled from the unassigned partitions queue --------
final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
new ClosableBlockingQueue<>();
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// -------- start test --------
final TestKafkaConsumerThread testThread =
new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
testThread.start();
testThread.startPartitionReassignment();
// wait until the reassignment has started
midAssignmentLatch.await();
testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
// the wakeup in the setOffsetsToCommit() call should have been buffered, and not called on the consumer
verify(mockConsumer, never()).wakeup();
continueAssigmentLatch.trigger();
testThread.waitPartitionReassignmentComplete();
// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
assertEquals(
newPartition.getOffset() + 1,
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
}
// after the reassignment, the consumer should be restored the wakeup call
verify(mockConsumer, times(1)).wakeup();
assertEquals(0, unassignedPartitionsQueue.size());
}
/**
* A testable {@link KafkaConsumerThread} that injects multiple latches exactly before and after
* partition reassignment, so that tests are eligible to setup various conditions before the reassignment happens
* and inspect reassignment results after it is completed.
*/
private static class TestKafkaConsumerThread extends KafkaConsumerThread {
private final KafkaConsumer<byte[], byte[]> mockConsumer;
private final MultiShotLatch preReassignmentLatch = new MultiShotLatch();
private final MultiShotLatch startReassignmentLatch = new MultiShotLatch();
private final MultiShotLatch reassignmentCompleteLatch = new MultiShotLatch();
private final MultiShotLatch postReassignmentLatch = new MultiShotLatch();
public TestKafkaConsumerThread(
KafkaConsumer<byte[], byte[]> mockConsumer,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
Handover handover) {
super(
mock(Logger.class),
handover,
new Properties(),
unassignedPartitionsQueue,
new KafkaConsumerCallBridge(),
"test-kafka-consumer-thread",
0,
false,
new UnregisteredMetricsGroup(),
new UnregisteredMetricsGroup());
this.mockConsumer = mockConsumer;
}
public void waitPartitionReassignmentInvoked() throws InterruptedException {
preReassignmentLatch.await();
}
public void startPartitionReassignment() {
startReassignmentLatch.trigger();
}
public void waitPartitionReassignmentComplete() throws InterruptedException {
reassignmentCompleteLatch.await();
}
public void endPartitionReassignment() {
postReassignmentLatch.trigger();
}
@Override
public KafkaConsumer<byte[], byte[]> getConsumer() {
return mockConsumer;
}
@Override
void reassignPartitions() throws Exception {
// triggers blocking calls on waitPartitionReassignmentInvoked()
preReassignmentLatch.trigger();
// waits for startPartitionReassignment() to be called
startReassignmentLatch.await();
try {
super.reassignPartitions();
} finally {
// triggers blocking calls on waitPartitionReassignmentComplete()
reassignmentCompleteLatch.trigger();
// waits for endPartitionReassignment() to be called
postReassignmentLatch.await();
}
}
}
@SuppressWarnings("unchecked")
private static KafkaConsumer<byte[], byte[]> createMockConsumer(
final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition,
final Map<TopicPartition, Long> mockRetrievedPositions,
final boolean earlyWakeup,
final OneShotLatch midAssignmentLatch,
final OneShotLatch continueAssignmentLatch) {
final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
if (midAssignmentLatch != null) {
midAssignmentLatch.trigger();
}
if (continueAssignmentLatch != null) {
continueAssignmentLatch.await();
}
return mockConsumerAssignmentAndPosition.keySet();
}
});
when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class));
if (!earlyWakeup) {
when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgumentAt(0, TopicPartition.class));
}
});
} else {
when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new WakeupException());
}
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
mockConsumerAssignmentAndPosition.clear();
List<TopicPartition> assignedPartitions = invocationOnMock.getArgumentAt(0, List.class);
for (TopicPartition assigned : assignedPartitions) {
mockConsumerAssignmentAndPosition.put(assigned, null);
}
return null;
}
}).when(mockConsumer).assign(anyListOf(TopicPartition.class));
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
TopicPartition partition = invocationOnMock.getArgumentAt(0, TopicPartition.class);
long position = invocationOnMock.getArgumentAt(1, long.class);
if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
throw new Exception("the current mock assignment does not contain partition " + partition);
} else {
mockConsumerAssignmentAndPosition.put(partition, position);
}
return null;
}
}).when(mockConsumer).seek(any(TopicPartition.class), anyLong());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
TopicPartition partition = invocationOnMock.getArgumentAt(0, TopicPartition.class);
if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
throw new Exception("the current mock assignment does not contain partition " + partition);
} else {
Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
if (mockRetrievedPosition == null) {
throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
} else {
mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
}
}
return null;
}
}).when(mockConsumer).seekToBeginning(any(TopicPartition.class));
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
TopicPartition partition = invocationOnMock.getArgumentAt(0, TopicPartition.class);
if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
throw new Exception("the current mock assignment does not contain partition " + partition);
} else {
Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
if (mockRetrievedPosition == null) {
throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
} else {
mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
}
}
return null;
}
}).when(mockConsumer).seekToEnd(any(TopicPartition.class));
return mockConsumer;
}
}