blob: b864d692fde44b96131a6efce0869e355c9d7c1c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ConsumerCoordinatorTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private int sessionTimeoutMs = 10000;
private int heartbeatIntervalMs = 5000;
private long retryBackoffMs = 100;
private boolean autoCommitEnabled = false;
private long autoCommitIntervalMs = 2000;
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
private MockTime time;
private MockClient client;
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private SubscriptionState subscriptions;
private Metadata metadata;
private Metrics metrics;
private ConsumerNetworkClient consumerClient;
private MockRebalanceListener rebalanceListener;
private MockCommitCallback defaultOffsetCommitCallback;
private ConsumerCoordinator coordinator;
@Before
public void setup() {
this.time = new MockTime();
this.client = new MockClient(time);
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.defaultOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
client.setNode(node);
this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled);
}
@After
public void teardown() {
this.metrics.close();
}
@Test
public void testNormalHeartbeat() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// normal heartbeat
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.succeeded());
}
@Test(expected = GroupAuthorizationException.class)
public void testGroupDescribeUnauthorized() {
client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
coordinator.ensureCoordinatorKnown();
}
@Test(expected = GroupAuthorizationException.class)
public void testGroupReadUnauthorized() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
Errors.GROUP_AUTHORIZATION_FAILED.code()));
coordinator.ensurePartitionAssignment();
}
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()));
time.sleep(sessionTimeoutMs);
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.failed());
assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// not_coordinator will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code()));
time.sleep(sessionTimeoutMs);
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.failed());
assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testIllegalGeneration() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
time.sleep(sessionTimeoutMs);
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.failed());
assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
assertTrue(coordinator.needRejoin());
}
@Test
public void testUnknownConsumerId() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code()));
time.sleep(sessionTimeoutMs);
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.failed());
assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception());
assertTrue(coordinator.needRejoin());
}
@Test
public void testCoordinatorDisconnect() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// coordinator disconnect will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
assertEquals(1, consumerClient.pendingRequestCount());
assertFalse(future.isDone());
client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
time.sleep(sessionTimeoutMs);
consumerClient.poll(0);
assertTrue(future.isDone());
assertTrue(future.failed());
assertTrue(future.exception() instanceof DisconnectException);
assertTrue(coordinator.coordinatorUnknown());
}
@Test(expected = ApiException.class)
public void testJoinGroupInvalidGroupId() {
final String consumerId = "leader";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
Errors.INVALID_GROUP_ID.code()));
coordinator.ensurePartitionAssignment();
}
@Test
public void testNormalJoinGroupLeader() {
final String consumerId = "leader";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// normal join group
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
return sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().containsKey(consumerId);
}
}, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
}
@Test
public void testWakeupDuringJoin() {
final String consumerId = "leader";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
// prepare only the first half of the join and then trigger the wakeup
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
consumerClient.wakeup();
try {
coordinator.ensurePartitionAssignment();
} catch (WakeupException e) {
// ignore
}
// now complete the second half
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
}
@Test
public void testNormalJoinGroupFollower() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
return sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().isEmpty();
}
}, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
}
@Test
public void testLeaveGroupOnClose() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
final AtomicBoolean received = new AtomicBoolean(false);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
received.set(true);
LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
return leaveRequest.memberId().equals(consumerId) &&
leaveRequest.groupId().equals(groupId);
}
}, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
coordinator.close();
assertTrue(received.get());
}
@Test
public void testMaybeLeaveGroup() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
final AtomicBoolean received = new AtomicBoolean(false);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
received.set(true);
LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
return leaveRequest.memberId().equals(consumerId) &&
leaveRequest.groupId().equals(groupId);
}
}, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
coordinator.maybeLeaveGroup();
assertTrue(received.get());
assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation);
}
@Test(expected = KafkaException.class)
public void testUnexpectedErrorOnSyncGroup() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN.code()));
coordinator.ensurePartitionAssignment();
}
@Test
public void testUnknownMemberIdOnSyncGroup() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// join initially, but let coordinator returns unknown member id
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID.code()));
// now we should see a new join with the empty UNKNOWN_MEMBER_ID
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testRebalanceInProgressOnSyncGroup() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS.code()));
// then let the full join/sync finish successfully
client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testIllegalGenerationOnSyncGroup() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION.code()));
// then let the full join/sync finish successfully
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testMetadataChangeTriggersRebalance() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
// a new partition is added to the topic
metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
// we should detect the change and ask for reassignment
assertTrue(subscriptions.partitionAssignmentNeeded());
}
@Test
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
assertFalse(subscriptions.partitionAssignmentNeeded());
}
@Test
public void testIncludeInternalTopicsConfigOption() {
coordinator = buildCoordinator(new Metrics(), assignors, false, false);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
assertTrue(subscriptions.partitionAssignmentNeeded());
}
@Test
public void testRejoinGroup() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// join the group once
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
// and join the group again
subscriptions.needReassignment();
client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertEquals(2, rebalanceListener.revokedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
assertEquals(2, rebalanceListener.assignedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
}
@Test
public void testDisconnectInJoin() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// disconnected from original coordinator will cause re-discover and join again
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
}
@Test(expected = ApiException.class)
public void testInvalidSessionTimeout() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// coordinator doesn't like the session timeout
client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
coordinator.ensurePartitionAssignment();
}
@Test
public void testCommitOffsetOnly() {
subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
assertTrue(success.get());
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testAutoCommitDynamicAssignment() {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
subscriptions.seek(tp, 100);
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
time.sleep(autoCommitIntervalMs);
consumerClient.poll(0);
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testAutoCommitDynamicAssignmentRebalance() {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// haven't joined, so should not cause a commit
time.sleep(autoCommitIntervalMs);
consumerClient.poll(0);
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
subscriptions.seek(tp, 100);
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
time.sleep(autoCommitIntervalMs);
consumerClient.poll(0);
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testAutoCommitManualAssignment() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 100);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
time.sleep(autoCommitIntervalMs);
consumerClient.poll(0);
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 100);
// no commit initially since coordinator is unknown
consumerClient.poll(0);
time.sleep(autoCommitIntervalMs);
consumerClient.poll(0);
assertNull(subscriptions.committed(tp));
// now find the coordinator
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// sleep only for the retry backoff
time.sleep(retryBackoffMs);
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
consumerClient.poll(0);
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testCommitOffsetMetadata() {
subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
assertTrue(success.get());
assertEquals(100L, subscriptions.committed(tp).offset());
assertEquals("hello", subscriptions.committed(tp).metadata());
}
@Test
public void testCommitOffsetAsyncWithDefaultCallback() {
int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertNull(defaultOffsetCommitCallback.exception);
}
@Test
public void testCommitAfterLeaveGroup() {
// enable auto-assignment
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
coordinator.ensurePartitionAssignment();
// now switch to manual assignment
client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
subscriptions.unsubscribe();
coordinator.maybeLeaveGroup();
subscriptions.assignFromUser(Arrays.asList(tp));
// the client should not reuse generation/memberId from auto-subscribed generation
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
}
}, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
assertTrue(success.get());
}
@Test
public void testCommitOffsetAsyncFailedWithDefaultCallback() {
int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
}
@Test
public void testCommitOffsetAsyncCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// async commit with coordinator not available
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
}
@Test
public void testCommitOffsetAsyncNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// async commit with not coordinator
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), cb.exception);
}
@Test
public void testCommitOffsetAsyncDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// async commit with coordinator disconnected
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
assertTrue(cb.exception instanceof DisconnectException);
}
@Test
public void testCommitOffsetSyncNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
}
@Test
public void testCommitOffsetSyncCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
}
@Test
public void testCommitOffsetSyncCoordinatorDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
}
@Test(expected = OffsetMetadataTooLarge.class)
public void testCommitOffsetMetadataTooLarge() {
// since offset metadata is provided by the user, we have to propagate the exception so they can handle it
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
@Test(expected = CommitFailedException.class)
public void testCommitOffsetIllegalGeneration() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
@Test(expected = CommitFailedException.class)
public void testCommitOffsetUnknownMemberId() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
@Test(expected = CommitFailedException.class)
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
@Test(expected = KafkaException.class)
public void testCommitOffsetSyncCallbackWithNonRetriableException() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
// sync commit with invalid partitions should throw if we have no callback
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false);
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
}
@Test
public void testRefreshOffset() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
assertFalse(subscriptions.refreshCommitsNeeded());
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testRefreshOffsetLoadInProgress() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
assertFalse(subscriptions.refreshCommitsNeeded());
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testRefreshOffsetNotCoordinatorForConsumer() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
assertFalse(subscriptions.refreshCommitsNeeded());
assertEquals(100L, subscriptions.committed(tp).offset());
}
@Test
public void testRefreshOffsetWithNoFetchableOffsets() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
coordinator.refreshCommittedOffsetsIfNeeded();
assertFalse(subscriptions.refreshCommitsNeeded());
assertEquals(null, subscriptions.committed(tp));
}
@Test
public void testProtocolMetadataOrder() {
RoundRobinAssignor roundRobin = new RoundRobinAssignor();
RangeAssignor range = new RangeAssignor();
try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(roundRobin.name(), metadata.get(0).name());
assertEquals(range.name(), metadata.get(1).name());
}
try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(range.name(), metadata.get(0).name());
assertEquals(roundRobin.name(), metadata.get(1).name());
}
}
private ConsumerCoordinator buildCoordinator(Metrics metrics,
List<PartitionAssignor> assignors,
boolean excludeInternalTopics,
boolean autoCommitEnabled) {
return new ConsumerCoordinator(
consumerClient,
groupId,
sessionTimeoutMs,
heartbeatIntervalMs,
assignors,
metadata,
subscriptions,
metrics,
"consumer" + groupId,
time,
retryBackoffMs,
defaultOffsetCommitCallback,
autoCommitEnabled,
autoCommitIntervalMs,
null,
excludeInternalTopics);
}
private Struct consumerMetadataResponse(Node node, short error) {
GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
}
private Struct heartbeatResponse(short error) {
HeartbeatResponse response = new HeartbeatResponse(error);
return response.toStruct();
}
private Struct joinGroupLeaderResponse(int generationId, String memberId,
Map<String, List<String>> subscriptions,
short error) {
Map<String, ByteBuffer> metadata = new HashMap<>();
for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
metadata.put(subscriptionEntry.getKey(), buf);
}
return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct();
}
private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
Collections.<String, ByteBuffer>emptyMap()).toStruct();
}
private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
return new SyncGroupResponse(error, buf).toStruct();
}
private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
OffsetCommitResponse response = new OffsetCommitResponse(responseData);
return response.toStruct();
}
private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
return response.toStruct();
}
private OffsetCommitCallback callback(final AtomicBoolean success) {
return new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null)
success.set(true);
}
};
}
private static class MockCommitCallback implements OffsetCommitCallback {
public int invoked = 0;
public Exception exception = null;
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
invoked++;
this.exception = exception;
}
}
private static class MockRebalanceListener implements ConsumerRebalanceListener {
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int revokedCount = 0;
public int assignedCount = 0;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
this.assigned = partitions;
assignedCount++;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
this.revoked = partitions;
revokedCount++;
}
}
}