KAFKA-16927; Handle expanding leader endpoints (#17363)
When a replica restarts in the follower state it is possible for the set of leader endpoints to not match the latest set of leader endpoints. Voters will discover the latest set of leader endpoints through the BEGIN_QUORUM_EPOCH request. This means that KRaft needs to allow for the replica to transition from Follower to Follower when only the set of leader endpoints has changed.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Alyssa Huang <ahuang@confluent.io>
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 8882737..0598ce0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -470,15 +470,53 @@
*/
public void transitionToFollower(int epoch, int leaderId, Endpoints endpoints) {
int currentEpoch = state.epoch();
- if (localId.isPresent() && leaderId == localId.getAsInt()) {
- throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
- " and epoch " + epoch + " since it matches the local broker.id " + localId);
+ if (endpoints.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s without a leader endpoint",
+ leaderId,
+ epoch
+ )
+ );
+ } else if (localId.isPresent() && leaderId == localId.getAsInt()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s since it matches the local node.id %s",
+ leaderId,
+ epoch,
+ localId
+ )
+ );
} else if (epoch < currentEpoch) {
- throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
- " and epoch " + epoch + " since the current epoch " + currentEpoch + " is larger");
- } else if (epoch == currentEpoch && (isFollower() || isLeader())) {
- throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
- " and epoch " + epoch + " from state " + state);
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s since the current epoch %s is larger",
+ leaderId,
+ epoch,
+ currentEpoch
+ )
+ );
+ } else if (epoch == currentEpoch) {
+ if (isFollower() && state.leaderEndpoints().size() >= endpoints.size()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s, epoch %s and endpoints %s from state %s",
+ leaderId,
+ epoch,
+ endpoints,
+ state
+ )
+ );
+ } else if (isLeader()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s from state %s",
+ leaderId,
+ epoch,
+ state
+ )
+ );
+ }
}
durableTransitionTo(
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 63e7f04..04ab47f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -2257,6 +2257,45 @@
assertEquals(-2, fetchRequest.destination().id());
}
+ @Test
+ public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey leader = replicaKey(local.id() + 1, true);
+ int leaderEpoch = 3;
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withElectedLeader(leaderEpoch, leader.id())
+ .withKip853Rpc(true)
+ .build();
+
+ context.client.poll();
+
+ HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new HashMap<>(2);
+ leaderListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
+ );
+ leaderListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
+ );
+ Endpoints leaderEndpoints = Endpoints.fromInetSocketAddresses(leaderListenersMap);
+
+ context.deliverRequest(context.beginEpochRequest(leaderEpoch, leader.id(), leaderEndpoints));
+ context.pollUntilResponse();
+
+ context.assertElectedLeader(leaderEpoch, leader.id());
+
+ context.assertSentBeginQuorumEpochResponse(
+ Errors.NONE,
+ leaderEpoch,
+ OptionalInt.of(leader.id())
+ );
+ }
+
private static void verifyVotersRecord(
VoterSet expectedVoterSet,
ByteBuffer recordKey,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9071bbe..03bf815 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
@@ -55,6 +56,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
@@ -960,6 +962,45 @@
);
}
+ @Test
+ public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey leader = replicaKey(local.id() + 1, true);
+ int leaderEpoch = 3;
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withStaticVoters(voters)
+ .withElectedLeader(leaderEpoch, leader.id())
+ .withKip853Rpc(true)
+ .build();
+
+ context.client.poll();
+
+ HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new HashMap<>(2);
+ leaderListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
+ );
+ leaderListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
+ );
+ Endpoints leaderEndpoints = Endpoints.fromInetSocketAddresses(leaderListenersMap);
+
+ context.deliverRequest(context.beginEpochRequest(leaderEpoch, leader.id(), leaderEndpoints));
+ context.pollUntilResponse();
+
+ context.assertElectedLeader(leaderEpoch, leader.id());
+
+ context.assertSentBeginQuorumEpochResponse(
+ Errors.NONE,
+ leaderEpoch,
+ OptionalInt.of(leader.id())
+ );
+ }
+
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleBeginQuorumResponse(boolean withKip853Rpc) throws Exception {
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index fc14f4d..7131701 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
@@ -32,6 +33,7 @@
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -1213,6 +1215,38 @@
@ParameterizedTest
@EnumSource(value = KRaftVersion.class)
+ public void testFollowerToFollowerSameEpochAndMoreEndpoints(KRaftVersion kraftVersion) {
+ int node1 = 1;
+ int node2 = 2;
+ VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion);
+ QuorumState state = initializeEmptyState(voters, kraftVersion);
+ state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
+ state.transitionToFollower(
+ 8,
+ node2,
+ voters.listeners(node2)
+ );
+
+ HashMap<ListenerName, InetSocketAddress> newNode2ListenersMap = new HashMap<>(2);
+ newNode2ListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + node2)
+ );
+ newNode2ListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + node2)
+ );
+ Endpoints newNode2Endpoints = Endpoints.fromInetSocketAddresses(newNode2ListenersMap);
+
+ state.transitionToFollower(
+ 8,
+ node2,
+ newNode2Endpoints
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = KRaftVersion.class)
public void testFollowerToFollowerHigherEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index f9f1c9e..dc8e978 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1368,6 +1368,14 @@
return beginEpochRequest(clusterId, epoch, leaderId);
}
+ BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId, Endpoints endpoints) {
+ ReplicaKey localReplicaKey = kip853Rpc ?
+ ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
+ ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID);
+
+ return beginEpochRequest(clusterId, epoch, leaderId, endpoints, localReplicaKey);
+ }
+
BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) {
ReplicaKey localReplicaKey = kip853Rpc ?
ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
@@ -1382,12 +1390,28 @@
int leaderId,
ReplicaKey voterKey
) {
+ return beginEpochRequest(
+ clusterId,
+ epoch,
+ leaderId,
+ startingVoters.listeners(leaderId),
+ voterKey
+ );
+ }
+
+ BeginQuorumEpochRequestData beginEpochRequest(
+ String clusterId,
+ int epoch,
+ int leaderId,
+ Endpoints endpoints,
+ ReplicaKey voterKey
+ ) {
return RaftUtil.singletonBeginQuorumEpochRequest(
metadataPartition,
clusterId,
epoch,
leaderId,
- startingVoters.listeners(leaderId),
+ endpoints,
voterKey
);
}