blob: d74eba4ab22e0d3a0750058ced3c33b6d0cb4272 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ProspectiveStateTest {
private final ReplicaKey localReplicaKey = ReplicaKey.of(0, Uuid.randomUuid());
private final Endpoints leaderEndpoints = Endpoints.fromInetSocketAddresses(
Collections.singletonMap(
ListenerName.normalised("CONTROLLER"),
InetSocketAddress.createUnresolved("mock-host-3", 1234)
)
);
private final int epoch = 5;
private final MockTime time = new MockTime();
private final int electionTimeoutMs = 10000;
private final LogContext logContext = new LogContext();
private final int localId = 0;
private final int votedId = 1;
private final Uuid votedDirectoryId = Uuid.randomUuid();
private final ReplicaKey votedKeyWithDirectoryId = ReplicaKey.of(votedId, votedDirectoryId);
private final ReplicaKey votedKeyWithoutDirectoryId = ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID);
private ProspectiveState newProspectiveState(
VoterSet voters,
OptionalInt leaderId,
Optional<ReplicaKey> votedKey
) {
return new ProspectiveState(
time,
localReplicaKey.id(),
epoch,
leaderId,
leaderId.isPresent() ? leaderEndpoints : Endpoints.empty(),
votedKey,
voters,
Optional.empty(),
1,
electionTimeoutMs,
logContext
);
}
private ProspectiveState newProspectiveState(VoterSet voters) {
return new ProspectiveState(
time,
localReplicaKey.id(),
epoch,
OptionalInt.empty(),
Endpoints.empty(),
Optional.empty(),
voters,
Optional.empty(),
1,
electionTimeoutMs,
logContext
);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testSingleNodeQuorum(boolean withDirectoryId) {
ProspectiveState state = newProspectiveState(voterSetWithLocal(IntStream.empty(), withDirectoryId));
assertTrue(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertEquals(Collections.emptySet(), state.epochElection().unrecordedVoters());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testTwoNodeQuorumVoteRejected(boolean withDirectoryId) {
ReplicaKey otherNode = replicaKey(1, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(otherNode), withDirectoryId)
);
assertFalse(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertEquals(Collections.singleton(otherNode), state.epochElection().unrecordedVoters());
assertTrue(state.recordRejectedVote(otherNode.id()));
assertFalse(state.epochElection().isVoteGranted());
assertTrue(state.epochElection().isVoteRejected());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testTwoNodeQuorumVoteGranted(boolean withDirectoryId) {
ReplicaKey otherNode = replicaKey(1, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(otherNode), withDirectoryId)
);
assertFalse(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertEquals(Collections.singleton(otherNode), state.epochElection().unrecordedVoters());
assertTrue(state.recordGrantedVote(otherNode.id()));
assertEquals(Collections.emptySet(), state.epochElection().unrecordedVoters());
assertFalse(state.epochElection().isVoteRejected());
assertTrue(state.epochElection().isVoteGranted());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testThreeNodeQuorumVoteGranted(boolean withDirectoryId) {
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(node1, node2), withDirectoryId)
);
assertFalse(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertEquals(Set.of(node1, node2), state.epochElection().unrecordedVoters());
assertTrue(state.recordGrantedVote(node1.id()));
assertEquals(Collections.singleton(node2), state.epochElection().unrecordedVoters());
assertTrue(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertTrue(state.recordRejectedVote(node2.id()));
assertEquals(Collections.emptySet(), state.epochElection().unrecordedVoters());
assertTrue(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testThreeNodeQuorumVoteRejected(boolean withDirectoryId) {
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(node1, node2), withDirectoryId)
);
assertFalse(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertEquals(Set.of(node1, node2), state.epochElection().unrecordedVoters());
assertTrue(state.recordRejectedVote(node1.id()));
assertEquals(Collections.singleton(node2), state.epochElection().unrecordedVoters());
assertFalse(state.epochElection().isVoteGranted());
assertFalse(state.epochElection().isVoteRejected());
assertTrue(state.recordRejectedVote(node2.id()));
assertEquals(Collections.emptySet(), state.epochElection().unrecordedVoters());
assertFalse(state.epochElection().isVoteGranted());
assertTrue(state.epochElection().isVoteRejected());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCanChangePreVote(boolean withDirectoryId) {
int voter1 = 1;
int voter2 = 2;
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.of(voter1, voter2), withDirectoryId)
);
assertTrue(state.recordGrantedVote(voter1));
assertTrue(state.epochElection().isVoteGranted());
assertFalse(state.recordRejectedVote(voter1));
assertFalse(state.epochElection().isVoteGranted());
assertTrue(state.recordRejectedVote(voter2));
assertTrue(state.epochElection().isVoteRejected());
assertFalse(state.recordGrantedVote(voter2));
assertFalse(state.epochElection().isVoteRejected());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCannotGrantOrRejectNonVoters(boolean withDirectoryId) {
int nonVoterId = 1;
ProspectiveState state = newProspectiveState(voterSetWithLocal(IntStream.empty(), withDirectoryId));
assertThrows(IllegalArgumentException.class, () -> state.recordGrantedVote(nonVoterId));
assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(nonVoterId));
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testConsecutiveGrant(boolean withDirectoryId) {
int otherNodeId = 1;
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.of(otherNodeId), withDirectoryId)
);
assertTrue(state.recordGrantedVote(otherNodeId));
assertFalse(state.recordGrantedVote(otherNodeId));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsecutiveReject(boolean withDirectoryId) {
int otherNodeId = 1;
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.of(otherNodeId), withDirectoryId)
);
assertTrue(state.recordRejectedVote(otherNodeId));
assertFalse(state.recordRejectedVote(otherNodeId));
}
@ParameterizedTest
@CsvSource({ "true,true", "true,false", "false,true", "false,false" })
public void testGrantVote(boolean isLogUpToDate, boolean withDirectoryId) {
ReplicaKey node0 = replicaKey(0, withDirectoryId);
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(node1, node2), withDirectoryId)
);
assertEquals(isLogUpToDate, state.canGrantVote(node0, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node1, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node2, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node0, isLogUpToDate, false));
assertEquals(isLogUpToDate, state.canGrantVote(node1, isLogUpToDate, false));
assertEquals(isLogUpToDate, state.canGrantVote(node2, isLogUpToDate, false));
}
@ParameterizedTest
@CsvSource({ "true,true", "true,false", "false,true", "false,false" })
public void testGrantVoteWithVotedKey(boolean isLogUpToDate, boolean withDirectoryId) {
ReplicaKey node0 = replicaKey(0, withDirectoryId);
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(node1, node2), withDirectoryId),
OptionalInt.empty(),
Optional.of(node1)
);
assertEquals(isLogUpToDate, state.canGrantVote(node0, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node1, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node2, isLogUpToDate, true));
assertFalse(state.canGrantVote(node0, isLogUpToDate, false));
assertTrue(state.canGrantVote(node1, isLogUpToDate, false));
assertFalse(state.canGrantVote(node2, isLogUpToDate, false));
}
@ParameterizedTest
@CsvSource({ "true,true", "true,false", "false,true", "false,false" })
public void testGrantVoteWithLeader(boolean isLogUpToDate, boolean withDirectoryId) {
ReplicaKey node0 = replicaKey(0, withDirectoryId);
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
ProspectiveState state = newProspectiveState(
voterSetWithLocal(Stream.of(node1, node2), withDirectoryId),
OptionalInt.of(node1.id()),
Optional.empty()
);
assertEquals(isLogUpToDate, state.canGrantVote(node0, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node1, isLogUpToDate, true));
assertEquals(isLogUpToDate, state.canGrantVote(node2, isLogUpToDate, true));
assertFalse(state.canGrantVote(node0, isLogUpToDate, false));
assertFalse(state.canGrantVote(node1, isLogUpToDate, false));
assertFalse(state.canGrantVote(node2, isLogUpToDate, false));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testElectionState(boolean withDirectoryId) {
VoterSet voters = voterSetWithLocal(IntStream.of(1, 2, 3), withDirectoryId);
ProspectiveState state = newProspectiveState(voters);
assertEquals(
ElectionState.withUnknownLeader(
epoch,
voters.voterIds()
),
state.election()
);
// with leader
state = newProspectiveState(voters, OptionalInt.of(1), Optional.empty());
assertEquals(
ElectionState.withElectedLeader(
epoch,
1,
Optional.empty(), voters.voterIds()
),
state.election()
);
// with voted key
ReplicaKey votedKey = replicaKey(1, withDirectoryId);
state = newProspectiveState(voters, OptionalInt.empty(), Optional.of(votedKey));
assertEquals(
ElectionState.withVotedCandidate(
epoch,
votedKey,
voters.voterIds()
),
state.election()
);
// with both
state = newProspectiveState(voters, OptionalInt.of(1), Optional.of(votedKey));
assertEquals(
ElectionState.withElectedLeader(
epoch,
1,
Optional.of(votedKey),
voters.voterIds()
),
state.election()
);
}
@Test
public void testElectionTimeout() {
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.empty(), true),
OptionalInt.empty(),
Optional.of(votedKeyWithDirectoryId)
);
assertEquals(epoch, state.epoch());
assertEquals(votedKeyWithDirectoryId, state.votedKey().get());
assertEquals(
ElectionState.withVotedCandidate(epoch, votedKeyWithDirectoryId, Collections.singleton(localId)),
state.election()
);
assertEquals(electionTimeoutMs, state.remainingElectionTimeMs(time.milliseconds()));
assertFalse(state.hasElectionTimeoutExpired(time.milliseconds()));
time.sleep(5000);
assertEquals(electionTimeoutMs - 5000, state.remainingElectionTimeMs(time.milliseconds()));
assertFalse(state.hasElectionTimeoutExpired(time.milliseconds()));
time.sleep(5000);
assertEquals(0, state.remainingElectionTimeMs(time.milliseconds()));
assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) {
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.empty(), true),
OptionalInt.empty(),
Optional.of(votedKeyWithoutDirectoryId));
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
);
assertTrue(state.canGrantVote(ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), isLogUpToDate, true)
);
assertTrue(state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), isLogUpToDate, false));
// Can grant PreVote to other replicas even if we have granted a standard vote to another replica
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId + 1, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
);
assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCanGrantVoteWithDirectoryId(boolean isLogUpToDate) {
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.empty(), true),
OptionalInt.empty(),
Optional.of(votedKeyWithDirectoryId));
// Same voterKey
// We will not grant PreVote for a replica we have already granted a standard vote to if their log is behind
assertEquals(
isLogUpToDate,
state.canGrantVote(votedKeyWithDirectoryId, isLogUpToDate, true)
);
assertTrue(state.canGrantVote(votedKeyWithDirectoryId, isLogUpToDate, false));
// Different directoryId
// We can grant PreVote for a replica we have already granted a standard vote to if their log is up-to-date,
// even if the directoryId is different
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), isLogUpToDate, true)
);
assertFalse(state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), isLogUpToDate, false));
// Missing directoryId
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
);
assertFalse(state.canGrantVote(ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
// Different voterId
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId + 1, votedDirectoryId), isLogUpToDate, true)
);
assertEquals(
isLogUpToDate,
state.canGrantVote(ReplicaKey.of(votedId + 1, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
);
assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, votedDirectoryId), true, false));
assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, ReplicaKey.NO_DIRECTORY_ID), true, false));
}
@Test
public void testLeaderEndpoints() {
ProspectiveState state = newProspectiveState(
voterSetWithLocal(IntStream.of(1, 2, 3), true),
OptionalInt.empty(),
Optional.of(ReplicaKey.of(1, Uuid.randomUuid()))
);
assertEquals(Endpoints.empty(), state.leaderEndpoints());
state = newProspectiveState(
voterSetWithLocal(IntStream.of(1, 2, 3), true),
OptionalInt.of(3),
Optional.of(ReplicaKey.of(1, Uuid.randomUuid()))
);
assertEquals(leaderEndpoints, state.leaderEndpoints());
}
private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID;
return ReplicaKey.of(id, directoryId);
}
private VoterSet voterSetWithLocal(IntStream remoteVoterIds, boolean withDirectoryId) {
Stream<ReplicaKey> remoteVoterKeys = remoteVoterIds
.boxed()
.map(id -> replicaKey(id, withDirectoryId));
return voterSetWithLocal(remoteVoterKeys, withDirectoryId);
}
private VoterSet voterSetWithLocal(Stream<ReplicaKey> remoteVoterKeys, boolean withDirectoryId) {
ReplicaKey actualLocalVoter = withDirectoryId ?
localReplicaKey :
ReplicaKey.of(localReplicaKey.id(), ReplicaKey.NO_DIRECTORY_ID);
return VoterSetTest.voterSet(
Stream.concat(Stream.of(actualLocalVoter), remoteVoterKeys)
);
}
}