MINOR: Cleanup Raft Module (#20348)
This PR aims at cleaning up the `raft` module further by getting rid of
some extra code which can be replaced by `record`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
index f95431a..e839cf8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
@@ -23,28 +23,25 @@
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.Objects;
/**
* The textual representation of a KIP-853 voter.
- *
+ * <p>
* Since this is used in command-line tools, format changes to the parsing logic require a KIP,
* and should be backwards compatible.
+ *
+ * @param directoryId The directory ID.
+ * @param nodeId The voter ID.
+ * @param host The voter hostname or IP address.
+ * @param port The voter port.
*/
-public final class DynamicVoter {
- private final Uuid directoryId;
- private final int nodeId;
- private final String host;
- private final int port;
-
+public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) {
/**
* Create a DynamicVoter object by parsing an input string.
*
- * @param input The input string.
- *
- * @return The DynamicVoter object.
- *
- * @throws IllegalArgumentException If parsing fails.
+ * @param input The input string.
+ * @return The DynamicVoter object.
+ * @throws IllegalArgumentException If parsing fails.
*/
public static DynamicVoter parse(String input) {
input = input.trim();
@@ -75,7 +72,7 @@
int endBracketIndex = input.indexOf("]");
if (endBracketIndex < 0) {
throw new IllegalArgumentException("Hostname began with left bracket, but no right " +
- "bracket was found.");
+ "bracket was found.");
}
host = input.substring(1, endBracketIndex);
input = input.substring(endBracketIndex + 1);
@@ -115,71 +112,17 @@
return new DynamicVoter(directoryId, nodeId, host, port);
}
- /**
- * Create a new KIP-853 voter.
- *
- * @param directoryId The directory ID.
- * @param nodeId The voter ID.
- * @param host The voter hostname or IP address.
- * @param port The voter port.
- */
- public DynamicVoter(
- Uuid directoryId,
- int nodeId,
- String host,
- int port
- ) {
- this.directoryId = directoryId;
- this.nodeId = nodeId;
- this.host = host;
- this.port = port;
- }
-
- public Uuid directoryId() {
- return directoryId;
- }
-
- public int nodeId() {
- return nodeId;
- }
-
- public String host() {
- return host;
- }
-
- public int port() {
- return port;
- }
-
public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
- ListenerName.normalised(controllerListenerName),
- new InetSocketAddress(host, port)));
+ ListenerName.normalised(controllerListenerName),
+ new InetSocketAddress(host, port)));
SupportedVersionRange supportedKRaftVersion =
- new SupportedVersionRange((short) 0, (short) 1);
+ new SupportedVersionRange((short) 0, (short) 1);
return VoterSet.VoterNode.of(voterKey, listeners, supportedKRaftVersion);
}
@Override
- public boolean equals(Object o) {
- if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false;
- DynamicVoter other = (DynamicVoter) o;
- return directoryId.equals(other.directoryId) &&
- nodeId == other.nodeId &&
- host.equals(other.host) &&
- port == other.port;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(directoryId,
- nodeId,
- host,
- port);
- }
-
- @Override
public String toString() {
if (host.contains(":")) {
return nodeId + "@[" + host + "]:" + port + ":" + directoryId;
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 0ebabe8..384a47b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2636,48 +2636,18 @@
private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
// The response epoch matches the local epoch, so we can handle the response
ApiKeys apiKey = ApiKeys.forId(response.data().apiKey());
- final boolean handledSuccessfully;
-
- switch (apiKey) {
- case FETCH:
- handledSuccessfully = handleFetchResponse(response, currentTimeMs);
- break;
-
- case VOTE:
- handledSuccessfully = handleVoteResponse(response, currentTimeMs);
- break;
-
- case BEGIN_QUORUM_EPOCH:
- handledSuccessfully = handleBeginQuorumEpochResponse(response, currentTimeMs);
- break;
-
- case END_QUORUM_EPOCH:
- handledSuccessfully = handleEndQuorumEpochResponse(response, currentTimeMs);
- break;
-
- case FETCH_SNAPSHOT:
- handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs);
- break;
-
- case API_VERSIONS:
- handledSuccessfully = handleApiVersionsResponse(response, currentTimeMs);
- break;
-
- case UPDATE_RAFT_VOTER:
- handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs);
- break;
-
- case ADD_RAFT_VOTER:
- handledSuccessfully = handleAddVoterResponse(response, currentTimeMs);
- break;
-
- case REMOVE_RAFT_VOTER:
- handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs);
- break;
-
- default:
- throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
- }
+ final boolean handledSuccessfully = switch (apiKey) {
+ case FETCH -> handleFetchResponse(response, currentTimeMs);
+ case VOTE -> handleVoteResponse(response, currentTimeMs);
+ case BEGIN_QUORUM_EPOCH -> handleBeginQuorumEpochResponse(response, currentTimeMs);
+ case END_QUORUM_EPOCH -> handleEndQuorumEpochResponse(response, currentTimeMs);
+ case FETCH_SNAPSHOT -> handleFetchSnapshotResponse(response, currentTimeMs);
+ case API_VERSIONS -> handleApiVersionsResponse(response, currentTimeMs);
+ case UPDATE_RAFT_VOTER -> handleUpdateVoterResponse(response, currentTimeMs);
+ case ADD_RAFT_VOTER -> handleAddVoterResponse(response, currentTimeMs);
+ case REMOVE_RAFT_VOTER -> handleRemoveVoterResponse(response, currentTimeMs);
+ default -> throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
+ };
requestManager.onResponseResult(
response.source(),
@@ -2740,48 +2710,18 @@
private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
ApiKeys apiKey = ApiKeys.forId(request.data().apiKey());
- final CompletableFuture<? extends ApiMessage> responseFuture;
-
- switch (apiKey) {
- case FETCH:
- responseFuture = handleFetchRequest(request, currentTimeMs);
- break;
-
- case VOTE:
- responseFuture = completedFuture(handleVoteRequest(request));
- break;
-
- case BEGIN_QUORUM_EPOCH:
- responseFuture = completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
- break;
-
- case END_QUORUM_EPOCH:
- responseFuture = completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
- break;
-
- case DESCRIBE_QUORUM:
- responseFuture = completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
- break;
-
- case FETCH_SNAPSHOT:
- responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
- break;
-
- case ADD_RAFT_VOTER:
- responseFuture = handleAddVoterRequest(request, currentTimeMs);
- break;
-
- case REMOVE_RAFT_VOTER:
- responseFuture = handleRemoveVoterRequest(request, currentTimeMs);
- break;
-
- case UPDATE_RAFT_VOTER:
- responseFuture = handleUpdateVoterRequest(request, currentTimeMs);
- break;
-
- default:
- throw new IllegalArgumentException("Unexpected request type " + apiKey);
- }
+ final CompletableFuture<? extends ApiMessage> responseFuture = switch (apiKey) {
+ case FETCH -> handleFetchRequest(request, currentTimeMs);
+ case VOTE -> completedFuture(handleVoteRequest(request));
+ case BEGIN_QUORUM_EPOCH -> completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
+ case END_QUORUM_EPOCH -> completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
+ case DESCRIBE_QUORUM -> completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
+ case FETCH_SNAPSHOT -> completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
+ case ADD_RAFT_VOTER -> handleAddVoterRequest(request, currentTimeMs);
+ case REMOVE_RAFT_VOTER -> handleRemoveVoterRequest(request, currentTimeMs);
+ case UPDATE_RAFT_VOTER -> handleUpdateVoterRequest(request, currentTimeMs);
+ default -> throw new IllegalArgumentException("Unexpected request type " + apiKey);
+ };
responseFuture.whenComplete((response, exception) -> {
ApiMessage message = response;
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
index 459fd30..557ca7b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
@@ -19,47 +19,14 @@
import java.util.Objects;
import java.util.OptionalInt;
-public class LeaderAndEpoch {
- private final OptionalInt leaderId;
- private final int epoch;
+public record LeaderAndEpoch(OptionalInt leaderId, int epoch) {
public static final LeaderAndEpoch UNKNOWN = new LeaderAndEpoch(OptionalInt.empty(), 0);
- public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
- this.leaderId = Objects.requireNonNull(leaderId);
- this.epoch = epoch;
- }
-
- public OptionalInt leaderId() {
- return leaderId;
- }
-
- public int epoch() {
- return epoch;
+ public LeaderAndEpoch {
+ Objects.requireNonNull(leaderId);
}
public boolean isLeader(int nodeId) {
return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- LeaderAndEpoch that = (LeaderAndEpoch) o;
- return epoch == that.epoch &&
- leaderId.equals(that.leaderId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(leaderId, epoch);
- }
-
- @Override
- public String toString() {
- return "LeaderAndEpoch(" +
- "leaderId=" + leaderId +
- ", epoch=" + epoch +
- ')';
- }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
index 2476ae8..3e72b3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -332,16 +332,16 @@
}
public void highWatermarkUpdated(LeaderState<?> leaderState) {
- leaderState.addVoterHandlerState().ifPresent(current -> {
- leaderState.highWatermark().ifPresent(highWatermark -> {
+ leaderState.addVoterHandlerState().ifPresent(current ->
+ leaderState.highWatermark().ifPresent(highWatermark ->
current.lastOffset().ifPresent(lastOffset -> {
if (highWatermark.offset() > lastOffset) {
// VotersRecord with the added voter was committed; complete the RPC
leaderState.resetAddVoterHandlerState(Errors.NONE, null, Optional.empty());
}
- });
- });
- });
+ })
+ )
+ );
}
private ApiVersionsRequestData buildApiVersionsRequest() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java b/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
index 8cebe1b..9e1bbb2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
@@ -27,17 +27,11 @@
/**
* Tracks the votes cast by voters in an election held by a Nominee.
*/
-public class EpochElection {
- private Map<Integer, VoterState> voterStates;
-
+public record EpochElection(Map<Integer, VoterState> voterStates) {
public EpochElection(Set<ReplicaKey> voters) {
- this.voterStates = voters.stream()
- .collect(
- Collectors.toMap(
- ReplicaKey::id,
- VoterState::new
- )
- );
+ this(voters.stream()
+ .collect(Collectors.toMap(ReplicaKey::id, VoterState::new))
+ );
}
/**
@@ -157,14 +151,6 @@
return voterStates.size() / 2 + 1;
}
- @Override
- public String toString() {
- return String.format(
- "EpochElection(voterStates=%s)",
- voterStates
- );
- }
-
private static final class VoterState {
private final ReplicaKey replicaKey;
private State state = State.UNRECORDED;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
index 2dea86d..7f231b1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
@@ -156,7 +156,7 @@
}
public void highWatermarkUpdated(LeaderState<?> leaderState) {
- leaderState.removeVoterHandlerState().ifPresent(current -> {
+ leaderState.removeVoterHandlerState().ifPresent(current ->
leaderState.highWatermark().ifPresent(highWatermark -> {
if (highWatermark.offset() > current.lastOffset()) {
// VotersRecord with the removed voter was committed; complete the RPC
@@ -182,7 +182,7 @@
leaderState.requestResign();
}
}
- });
- });
+ })
+ );
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
index eec3911..00451ea 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
@@ -71,14 +71,7 @@
return thresholdMap.size();
}
- private static class ThresholdKey<T extends Comparable<T>> implements Comparable<ThresholdKey<T>> {
- private final long id;
- private final T threshold;
-
- private ThresholdKey(long id, T threshold) {
- this.id = id;
- this.threshold = threshold;
- }
+ private record ThresholdKey<T extends Comparable<T>>(long id, T threshold) implements Comparable<ThresholdKey<T>> {
@Override
public int compareTo(ThresholdKey<T> o) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java
new file mode 100644
index 0000000..89bd907
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class LeaderAndEpochTest {
+
+ @Test
+ void testConstructorWithValidLeaderId() {
+ LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(1), 5);
+
+ assertEquals(OptionalInt.of(1), leaderAndEpoch.leaderId());
+ assertEquals(5, leaderAndEpoch.epoch());
+ }
+
+ @Test
+ void testConstructorWithEmptyLeaderId() {
+ LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 3);
+
+ assertEquals(OptionalInt.empty(), leaderAndEpoch.leaderId());
+ assertEquals(3, leaderAndEpoch.epoch());
+ }
+
+ @Test
+ void testConstructorThrowsExceptionWhenLeaderIdIsNull() {
+ Executable executable = () -> new LeaderAndEpoch(null, 1);
+
+ assertThrows(NullPointerException.class, executable);
+ }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 686daa3..d66fb31 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
@@ -670,17 +669,17 @@
KRaftVersion.KRAFT_VERSION_1
);
- IntStream.of(remoteIds).forEach(id -> {
- List.of(true, false).forEach(isPrevote -> {
+ IntStream.of(remoteIds).forEach(id ->
+ List.of(true, false).forEach(isPrevote ->
assertFalse(
state.canGrantVote(
ReplicaKey.of(id, ReplicaKey.NO_DIRECTORY_ID),
isLogUpToDate,
isPrevote
)
- );
- });
- });
+ )
+ )
+ );
}
@ParameterizedTest
@@ -867,25 +866,7 @@
);
}
- private static class MockOffsetMetadata implements OffsetMetadata {
- private final String value;
-
- private MockOffsetMetadata(String value) {
- this.value = value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- MockOffsetMetadata that = (MockOffsetMetadata) o;
- return Objects.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(value);
- }
+ private record MockOffsetMetadata(String value) implements OffsetMetadata {
}
private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 6f09682..6bca324 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -43,7 +43,6 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -627,86 +626,20 @@
);
}
- static class MockOffsetMetadata implements OffsetMetadata {
- final long id;
-
- MockOffsetMetadata(long id) {
- this.id = id;
- }
-
- @Override
- public String toString() {
- return "MockOffsetMetadata(" +
- "id=" + id +
- ')';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- MockOffsetMetadata that = (MockOffsetMetadata) o;
- return id == that.id;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id);
- }
+ record MockOffsetMetadata(long id) implements OffsetMetadata {
}
- static class LogEntry {
- final MockOffsetMetadata metadata;
- final long offset;
- final SimpleRecord record;
-
- LogEntry(MockOffsetMetadata metadata, long offset, SimpleRecord record) {
- this.metadata = metadata;
- this.offset = offset;
- this.record = record;
- }
+ record LogEntry(MockOffsetMetadata metadata, long offset, SimpleRecord record) {
LogOffsetMetadata logOffsetMetadata() {
return new LogOffsetMetadata(offset, Optional.of(metadata));
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- LogEntry logEntry = (LogEntry) o;
- return offset == logEntry.offset &&
- Objects.equals(metadata, logEntry.metadata) &&
- Objects.equals(record, logEntry.record);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(metadata, offset, record);
- }
-
- @Override
- public String toString() {
- return String.format(
- "LogEntry(metadata=%s, offset=%s, record=%s)",
- metadata,
- offset,
- record
- );
- }
}
- static class LogBatch {
- final List<LogEntry> entries;
- final int epoch;
- final boolean isControlBatch;
-
- LogBatch(int epoch, boolean isControlBatch, List<LogEntry> entries) {
+ record LogBatch(int epoch, boolean isControlBatch, List<LogEntry> entries) {
+ LogBatch {
if (entries.isEmpty())
throw new IllegalArgumentException("Empty batches are not supported");
- this.entries = entries;
- this.epoch = epoch;
- this.isControlBatch = isControlBatch;
}
long firstOffset() {
@@ -746,25 +679,8 @@
builder.close();
return builder.buffer();
}
-
- @Override
- public String toString() {
- return String.format("LogBatch(entries=%s, epoch=%s, isControlBatch=%s)", entries, epoch, isControlBatch);
- }
}
- private static class EpochStartOffset {
- final int epoch;
- final long startOffset;
-
- private EpochStartOffset(int epoch, long startOffset) {
- this.epoch = epoch;
- this.startOffset = startOffset;
- }
-
- @Override
- public String toString() {
- return String.format("EpochStartOffset(epoch=%s, startOffset=%s)", epoch, startOffset);
- }
+ private record EpochStartOffset(int epoch, long startOffset) {
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 21d253a..604da18 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -51,7 +51,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
@@ -1026,33 +1025,7 @@
}
}
- private static class OffsetRange {
- public final long startOffset;
- public final long endOffset;
-
- private OffsetRange(long startOffset, long endOffset) {
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- OffsetRange that = (OffsetRange) o;
- return startOffset == that.startOffset &&
- endOffset == that.endOffset;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startOffset, endOffset);
- }
-
- @Override
- public String toString() {
- return String.format("OffsetRange(startOffset=%s, endOffset=%s)", startOffset, endOffset);
- }
+ private record OffsetRange(long startOffset, long endOffset) {
}
private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 3f8d96b..afb99a0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -545,12 +545,7 @@
}
}
- private static class SequentialAppendAction implements Runnable {
- final Cluster cluster;
-
- private SequentialAppendAction(Cluster cluster) {
- this.cluster = cluster;
- }
+ private record SequentialAppendAction(Cluster cluster) implements Runnable {
@Override
public void run() {
@@ -1005,14 +1000,7 @@
}
}
- private static class InflightRequest {
- final int sourceId;
- final Node destination;
-
- private InflightRequest(int sourceId, Node destination) {
- this.sourceId = sourceId;
- this.destination = destination;
- }
+ private record InflightRequest(int sourceId, Node destination) {
}
private interface NetworkFilter {
@@ -1046,16 +1034,13 @@
}
}
- private static class DropOutboundRequestsTo implements NetworkFilter {
- private final Set<InetSocketAddress> unreachable;
-
+ private record DropOutboundRequestsTo(Set<InetSocketAddress> unreachable) implements NetworkFilter {
/**
* This network filter drops any outbound message sent to the {@code unreachable} nodes.
*
* @param unreachable the set of destination address which are not reachable
*/
- private DropOutboundRequestsTo(Set<InetSocketAddress> unreachable) {
- this.unreachable = unreachable;
+ private DropOutboundRequestsTo {
}
@Override
@@ -1123,12 +1108,7 @@
}
}
- private static class MajorityReachedHighWatermark implements Invariant {
- final Cluster cluster;
-
- private MajorityReachedHighWatermark(Cluster cluster) {
- this.cluster = cluster;
- }
+ private record MajorityReachedHighWatermark(Cluster cluster) implements Invariant {
@Override
public void verify() {
@@ -1234,12 +1214,7 @@
}
}
- private static class SnapshotAtLogStart implements Invariant {
- final Cluster cluster;
-
- private SnapshotAtLogStart(Cluster cluster) {
- this.cluster = cluster;
- }
+ private record SnapshotAtLogStart(Cluster cluster) implements Invariant {
@Override
public void verify() {
@@ -1280,12 +1255,7 @@
}
}
- private static class LeaderNeverLoadSnapshot implements Invariant {
- final Cluster cluster;
-
- private LeaderNeverLoadSnapshot(Cluster cluster) {
- this.cluster = cluster;
- }
+ private record LeaderNeverLoadSnapshot(Cluster cluster) implements Invariant {
@Override
public void verify() {
@@ -1370,15 +1340,15 @@
});
for (LogBatch batch : log.readBatches(startOffset.get(), highWatermark)) {
- if (batch.isControlBatch) {
+ if (batch.isControlBatch()) {
continue;
}
- for (LogEntry entry : batch.entries) {
- long offset = entry.offset;
+ for (LogEntry entry : batch.entries()) {
+ long offset = entry.offset();
assertTrue(offset < highWatermark.getAsLong());
- int sequence = parseSequenceNumber(entry.record.value().duplicate());
+ int sequence = parseSequenceNumber(entry.record().value().duplicate());
committedSequenceNumbers.putIfAbsent(offset, sequence);
int committedSequence = committedSequenceNumbers.get(offset);
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 34b7c9f..cdbd728 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -660,32 +660,10 @@
}
}
- private static class FetchRequestTestCase {
- private final Uuid replicaDirectoryId;
- private final short version;
- private final short lastFetchedEpoch;
- private final String expectedJson;
-
- private FetchRequestTestCase(Uuid replicaDirectoryId, short version,
- short lastFetchedEpoch, String expectedJson) {
- this.replicaDirectoryId = replicaDirectoryId;
- this.version = version;
- this.lastFetchedEpoch = lastFetchedEpoch;
- this.expectedJson = expectedJson;
- }
+ private record FetchRequestTestCase(Uuid replicaDirectoryId, short version, short lastFetchedEpoch,
+ String expectedJson) {
}
- private static class FetchResponseTestCase {
- private final short version;
- private final int preferredReadReplicaId;
- private final String expectedJson;
-
- private FetchResponseTestCase(short version,
- int preferredReadReplicaId,
- String expectedJson) {
- this.version = version;
- this.preferredReadReplicaId = preferredReadReplicaId;
- this.expectedJson = expectedJson;
- }
+ private record FetchResponseTestCase(short version, int preferredReadReplicaId, String expectedJson) {
}
}