MINOR: Cleanup Raft Module (#20348)

This PR aims at cleaning up the `raft` module further by getting rid of
some extra code which can be replaced by `record`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
index f95431a..e839cf8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
@@ -23,28 +23,25 @@
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * The textual representation of a KIP-853 voter.
- *
+ * <p>
  * Since this is used in command-line tools, format changes to the parsing logic require a KIP,
  * and should be backwards compatible.
+ *
+ * @param directoryId The directory ID.
+ * @param nodeId      The voter ID.
+ * @param host        The voter hostname or IP address.
+ * @param port        The voter port.
  */
-public final class DynamicVoter {
-    private final Uuid directoryId;
-    private final int nodeId;
-    private final String host;
-    private final int port;
-
+public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) {
     /**
      * Create a DynamicVoter object by parsing an input string.
      *
-     * @param input                         The input string.
-     *
-     * @return                              The DynamicVoter object.
-     *
-     * @throws IllegalArgumentException     If parsing fails.
+     * @param input The input string.
+     * @return The DynamicVoter object.
+     * @throws IllegalArgumentException If parsing fails.
      */
     public static DynamicVoter parse(String input) {
         input = input.trim();
@@ -75,7 +72,7 @@
             int endBracketIndex = input.indexOf("]");
             if (endBracketIndex < 0) {
                 throw new IllegalArgumentException("Hostname began with left bracket, but no right " +
-                        "bracket was found.");
+                    "bracket was found.");
             }
             host = input.substring(1, endBracketIndex);
             input = input.substring(endBracketIndex + 1);
@@ -115,71 +112,17 @@
         return new DynamicVoter(directoryId, nodeId, host, port);
     }
 
-    /**
-     * Create a new KIP-853 voter.
-     *
-     * @param directoryId   The directory ID.
-     * @param nodeId        The voter ID.
-     * @param host          The voter hostname or IP address.
-     * @param port          The voter port.
-     */
-    public DynamicVoter(
-        Uuid directoryId,
-        int nodeId,
-        String host,
-        int port
-    ) {
-        this.directoryId = directoryId;
-        this.nodeId = nodeId;
-        this.host = host;
-        this.port = port;
-    }
-
-    public Uuid directoryId() {
-        return directoryId;
-    }
-
-    public int nodeId() {
-        return nodeId;
-    }
-
-    public String host() {
-        return host;
-    }
-
-    public int port() {
-        return port;
-    }
-
     public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
         ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
         Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
-                ListenerName.normalised(controllerListenerName),
-                new InetSocketAddress(host, port)));
+            ListenerName.normalised(controllerListenerName),
+            new InetSocketAddress(host, port)));
         SupportedVersionRange supportedKRaftVersion =
-                new SupportedVersionRange((short) 0, (short) 1);
+            new SupportedVersionRange((short) 0, (short) 1);
         return VoterSet.VoterNode.of(voterKey, listeners, supportedKRaftVersion);
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false;
-        DynamicVoter other = (DynamicVoter) o;
-        return directoryId.equals(other.directoryId) &&
-            nodeId == other.nodeId &&
-            host.equals(other.host) &&
-            port == other.port;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(directoryId,
-            nodeId,
-            host,
-            port);
-    }
-
-    @Override
     public String toString() {
         if (host.contains(":")) {
             return nodeId + "@[" + host + "]:" + port + ":" + directoryId;
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 0ebabe8..384a47b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2636,48 +2636,18 @@
     private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
         // The response epoch matches the local epoch, so we can handle the response
         ApiKeys apiKey = ApiKeys.forId(response.data().apiKey());
-        final boolean handledSuccessfully;
-
-        switch (apiKey) {
-            case FETCH:
-                handledSuccessfully = handleFetchResponse(response, currentTimeMs);
-                break;
-
-            case VOTE:
-                handledSuccessfully = handleVoteResponse(response, currentTimeMs);
-                break;
-
-            case BEGIN_QUORUM_EPOCH:
-                handledSuccessfully = handleBeginQuorumEpochResponse(response, currentTimeMs);
-                break;
-
-            case END_QUORUM_EPOCH:
-                handledSuccessfully = handleEndQuorumEpochResponse(response, currentTimeMs);
-                break;
-
-            case FETCH_SNAPSHOT:
-                handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs);
-                break;
-
-            case API_VERSIONS:
-                handledSuccessfully = handleApiVersionsResponse(response, currentTimeMs);
-                break;
-
-            case UPDATE_RAFT_VOTER:
-                handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs);
-                break;
-
-            case ADD_RAFT_VOTER:
-                handledSuccessfully = handleAddVoterResponse(response, currentTimeMs);
-                break;
-
-            case REMOVE_RAFT_VOTER:
-                handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs);
-                break;
-
-            default:
-                throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
-        }
+        final boolean handledSuccessfully = switch (apiKey) {
+            case FETCH -> handleFetchResponse(response, currentTimeMs);
+            case VOTE -> handleVoteResponse(response, currentTimeMs);
+            case BEGIN_QUORUM_EPOCH -> handleBeginQuorumEpochResponse(response, currentTimeMs);
+            case END_QUORUM_EPOCH -> handleEndQuorumEpochResponse(response, currentTimeMs);
+            case FETCH_SNAPSHOT -> handleFetchSnapshotResponse(response, currentTimeMs);
+            case API_VERSIONS -> handleApiVersionsResponse(response, currentTimeMs);
+            case UPDATE_RAFT_VOTER -> handleUpdateVoterResponse(response, currentTimeMs);
+            case ADD_RAFT_VOTER -> handleAddVoterResponse(response, currentTimeMs);
+            case REMOVE_RAFT_VOTER -> handleRemoveVoterResponse(response, currentTimeMs);
+            default -> throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
+        };
 
         requestManager.onResponseResult(
             response.source(),
@@ -2740,48 +2710,18 @@
 
     private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
         ApiKeys apiKey = ApiKeys.forId(request.data().apiKey());
-        final CompletableFuture<? extends ApiMessage> responseFuture;
-
-        switch (apiKey) {
-            case FETCH:
-                responseFuture = handleFetchRequest(request, currentTimeMs);
-                break;
-
-            case VOTE:
-                responseFuture = completedFuture(handleVoteRequest(request));
-                break;
-
-            case BEGIN_QUORUM_EPOCH:
-                responseFuture = completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
-                break;
-
-            case END_QUORUM_EPOCH:
-                responseFuture = completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
-                break;
-
-            case DESCRIBE_QUORUM:
-                responseFuture = completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
-                break;
-
-            case FETCH_SNAPSHOT:
-                responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
-                break;
-
-            case ADD_RAFT_VOTER:
-                responseFuture = handleAddVoterRequest(request, currentTimeMs);
-                break;
-
-            case REMOVE_RAFT_VOTER:
-                responseFuture = handleRemoveVoterRequest(request, currentTimeMs);
-                break;
-
-            case UPDATE_RAFT_VOTER:
-                responseFuture = handleUpdateVoterRequest(request, currentTimeMs);
-                break;
-
-            default:
-                throw new IllegalArgumentException("Unexpected request type " + apiKey);
-        }
+        final CompletableFuture<? extends ApiMessage> responseFuture = switch (apiKey) {
+            case FETCH -> handleFetchRequest(request, currentTimeMs);
+            case VOTE -> completedFuture(handleVoteRequest(request));
+            case BEGIN_QUORUM_EPOCH -> completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
+            case END_QUORUM_EPOCH -> completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
+            case DESCRIBE_QUORUM -> completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
+            case FETCH_SNAPSHOT -> completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
+            case ADD_RAFT_VOTER -> handleAddVoterRequest(request, currentTimeMs);
+            case REMOVE_RAFT_VOTER -> handleRemoveVoterRequest(request, currentTimeMs);
+            case UPDATE_RAFT_VOTER -> handleUpdateVoterRequest(request, currentTimeMs);
+            default -> throw new IllegalArgumentException("Unexpected request type " + apiKey);
+        };
 
         responseFuture.whenComplete((response, exception) -> {
             ApiMessage message = response;
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
index 459fd30..557ca7b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
@@ -19,47 +19,14 @@
 import java.util.Objects;
 import java.util.OptionalInt;
 
-public class LeaderAndEpoch {
-    private final OptionalInt leaderId;
-    private final int epoch;
+public record LeaderAndEpoch(OptionalInt leaderId, int epoch) {
     public static final LeaderAndEpoch UNKNOWN = new LeaderAndEpoch(OptionalInt.empty(), 0);
 
-    public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
-        this.leaderId = Objects.requireNonNull(leaderId);
-        this.epoch = epoch;
-    }
-
-    public OptionalInt leaderId() {
-        return leaderId;
-    }
-
-    public int epoch() {
-        return epoch;
+    public LeaderAndEpoch {
+        Objects.requireNonNull(leaderId);
     }
 
     public boolean isLeader(int nodeId) {
         return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        LeaderAndEpoch that = (LeaderAndEpoch) o;
-        return epoch == that.epoch &&
-            leaderId.equals(that.leaderId);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(leaderId, epoch);
-    }
-
-    @Override
-    public String toString() {
-        return "LeaderAndEpoch(" +
-            "leaderId=" + leaderId +
-            ", epoch=" + epoch +
-            ')';
-    }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
index 2476ae8..3e72b3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -332,16 +332,16 @@
     }
 
     public void highWatermarkUpdated(LeaderState<?> leaderState) {
-        leaderState.addVoterHandlerState().ifPresent(current -> {
-            leaderState.highWatermark().ifPresent(highWatermark -> {
+        leaderState.addVoterHandlerState().ifPresent(current ->
+            leaderState.highWatermark().ifPresent(highWatermark ->
                 current.lastOffset().ifPresent(lastOffset -> {
                     if (highWatermark.offset() > lastOffset) {
                         // VotersRecord with the added voter was committed; complete the RPC
                         leaderState.resetAddVoterHandlerState(Errors.NONE, null, Optional.empty());
                     }
-                });
-            });
-        });
+                })
+            )
+        );
     }
 
     private ApiVersionsRequestData buildApiVersionsRequest() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java b/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
index 8cebe1b..9e1bbb2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
@@ -27,17 +27,11 @@
 /**
  *  Tracks the votes cast by voters in an election held by a Nominee.
  */
-public class EpochElection {
-    private Map<Integer, VoterState> voterStates;
-
+public record EpochElection(Map<Integer, VoterState> voterStates) {
     public EpochElection(Set<ReplicaKey> voters) {
-        this.voterStates = voters.stream()
-            .collect(
-                Collectors.toMap(
-                    ReplicaKey::id,
-                    VoterState::new
-                )
-            );
+        this(voters.stream()
+            .collect(Collectors.toMap(ReplicaKey::id, VoterState::new))
+        );
     }
 
     /**
@@ -157,14 +151,6 @@
         return voterStates.size() / 2 + 1;
     }
 
-    @Override
-    public String toString() {
-        return String.format(
-            "EpochElection(voterStates=%s)",
-            voterStates
-        );
-    }
-
     private static final class VoterState {
         private final ReplicaKey replicaKey;
         private State state = State.UNRECORDED;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
index 2dea86d..7f231b1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
@@ -156,7 +156,7 @@
     }
 
     public void highWatermarkUpdated(LeaderState<?> leaderState) {
-        leaderState.removeVoterHandlerState().ifPresent(current -> {
+        leaderState.removeVoterHandlerState().ifPresent(current ->
             leaderState.highWatermark().ifPresent(highWatermark -> {
                 if (highWatermark.offset() > current.lastOffset()) {
                     // VotersRecord with the removed voter was committed; complete the RPC
@@ -182,7 +182,7 @@
                         leaderState.requestResign();
                     }
                 }
-            });
-        });
+            })
+        );
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
index eec3911..00451ea 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
@@ -71,14 +71,7 @@
         return thresholdMap.size();
     }
 
-    private static class ThresholdKey<T extends Comparable<T>> implements Comparable<ThresholdKey<T>> {
-        private final long id;
-        private final T threshold;
-
-        private ThresholdKey(long id, T threshold) {
-            this.id = id;
-            this.threshold = threshold;
-        }
+    private record ThresholdKey<T extends Comparable<T>>(long id, T threshold) implements Comparable<ThresholdKey<T>> {
 
         @Override
         public int compareTo(ThresholdKey<T> o) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java
new file mode 100644
index 0000000..89bd907
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderAndEpochTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class LeaderAndEpochTest {
+
+    @Test
+    void testConstructorWithValidLeaderId() {
+        LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(1), 5);
+        
+        assertEquals(OptionalInt.of(1), leaderAndEpoch.leaderId());
+        assertEquals(5, leaderAndEpoch.epoch());
+    }
+
+    @Test
+    void testConstructorWithEmptyLeaderId() {
+        LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 3);
+        
+        assertEquals(OptionalInt.empty(), leaderAndEpoch.leaderId());
+        assertEquals(3, leaderAndEpoch.epoch());
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenLeaderIdIsNull() {
+        Executable executable = () -> new LeaderAndEpoch(null, 1);
+        
+        assertThrows(NullPointerException.class, executable);
+    }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 686daa3..d66fb31 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -34,7 +34,6 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
@@ -670,17 +669,17 @@
             KRaftVersion.KRAFT_VERSION_1
         );
 
-        IntStream.of(remoteIds).forEach(id -> {
-            List.of(true, false).forEach(isPrevote -> {
+        IntStream.of(remoteIds).forEach(id ->
+            List.of(true, false).forEach(isPrevote ->
                 assertFalse(
                     state.canGrantVote(
                         ReplicaKey.of(id, ReplicaKey.NO_DIRECTORY_ID),
                         isLogUpToDate,
                         isPrevote
                     )
-                );
-            });
-        });
+                )
+            )
+        );
     }
 
     @ParameterizedTest
@@ -867,25 +866,7 @@
         );
     }
 
-    private static class MockOffsetMetadata implements OffsetMetadata {
-        private final String value;
-
-        private MockOffsetMetadata(String value) {
-            this.value = value;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            MockOffsetMetadata that = (MockOffsetMetadata) o;
-            return Objects.equals(value, that.value);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(value);
-        }
+    private record MockOffsetMetadata(String value) implements OffsetMetadata {
     }
 
     private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 6f09682..6bca324 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -43,7 +43,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -627,86 +626,20 @@
         );
     }
 
-    static class MockOffsetMetadata implements OffsetMetadata {
-        final long id;
-
-        MockOffsetMetadata(long id) {
-            this.id = id;
-        }
-
-        @Override
-        public String toString() {
-            return "MockOffsetMetadata(" +
-                "id=" + id +
-                ')';
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            MockOffsetMetadata that = (MockOffsetMetadata) o;
-            return id == that.id;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(id);
-        }
+    record MockOffsetMetadata(long id) implements OffsetMetadata {
     }
 
-    static class LogEntry {
-        final MockOffsetMetadata metadata;
-        final long offset;
-        final SimpleRecord record;
-
-        LogEntry(MockOffsetMetadata metadata, long offset, SimpleRecord record) {
-            this.metadata = metadata;
-            this.offset = offset;
-            this.record = record;
-        }
+    record LogEntry(MockOffsetMetadata metadata, long offset, SimpleRecord record) {
 
         LogOffsetMetadata logOffsetMetadata() {
             return new LogOffsetMetadata(offset, Optional.of(metadata));
         }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            LogEntry logEntry = (LogEntry) o;
-            return offset == logEntry.offset &&
-                Objects.equals(metadata, logEntry.metadata) &&
-                Objects.equals(record, logEntry.record);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(metadata, offset, record);
-        }
-
-        @Override
-        public String toString() {
-            return String.format(
-                "LogEntry(metadata=%s, offset=%s, record=%s)",
-                metadata,
-                offset,
-                record
-            );
-        }
     }
 
-    static class LogBatch {
-        final List<LogEntry> entries;
-        final int epoch;
-        final boolean isControlBatch;
-
-        LogBatch(int epoch, boolean isControlBatch, List<LogEntry> entries) {
+    record LogBatch(int epoch, boolean isControlBatch, List<LogEntry> entries) {
+        LogBatch {
             if (entries.isEmpty())
                 throw new IllegalArgumentException("Empty batches are not supported");
-            this.entries = entries;
-            this.epoch = epoch;
-            this.isControlBatch = isControlBatch;
         }
 
         long firstOffset() {
@@ -746,25 +679,8 @@
             builder.close();
             return builder.buffer();
         }
-
-        @Override
-        public String toString() {
-            return String.format("LogBatch(entries=%s, epoch=%s, isControlBatch=%s)", entries, epoch, isControlBatch);
-        }
     }
 
-    private static class EpochStartOffset {
-        final int epoch;
-        final long startOffset;
-
-        private EpochStartOffset(int epoch, long startOffset) {
-            this.epoch = epoch;
-            this.startOffset = startOffset;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("EpochStartOffset(epoch=%s, startOffset=%s)", epoch, startOffset);
-        }
+    private record EpochStartOffset(int epoch, long startOffset) {
     }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 21d253a..604da18 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -51,7 +51,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.IntStream;
@@ -1026,33 +1025,7 @@
         }
     }
 
-    private static class OffsetRange {
-        public final long startOffset;
-        public final long endOffset;
-
-        private OffsetRange(long startOffset, long endOffset) {
-            this.startOffset = startOffset;
-            this.endOffset = endOffset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            OffsetRange that = (OffsetRange) o;
-            return startOffset == that.startOffset &&
-                endOffset == that.endOffset;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(startOffset, endOffset);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("OffsetRange(startOffset=%s, endOffset=%s)", startOffset, endOffset);
-        }
+    private record OffsetRange(long startOffset, long endOffset) {
     }
 
     private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 3f8d96b..afb99a0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -545,12 +545,7 @@
         }
     }
 
-    private static class SequentialAppendAction implements Runnable {
-        final Cluster cluster;
-
-        private SequentialAppendAction(Cluster cluster) {
-            this.cluster = cluster;
-        }
+    private record SequentialAppendAction(Cluster cluster) implements Runnable {
 
         @Override
         public void run() {
@@ -1005,14 +1000,7 @@
         }
     }
 
-    private static class InflightRequest {
-        final int sourceId;
-        final Node destination;
-
-        private InflightRequest(int sourceId, Node destination) {
-            this.sourceId = sourceId;
-            this.destination = destination;
-        }
+    private record InflightRequest(int sourceId, Node destination) {
     }
 
     private interface NetworkFilter {
@@ -1046,16 +1034,13 @@
         }
     }
 
-    private static class DropOutboundRequestsTo implements NetworkFilter {
-        private final Set<InetSocketAddress> unreachable;
-
+    private record DropOutboundRequestsTo(Set<InetSocketAddress> unreachable) implements NetworkFilter {
         /**
          * This network filter drops any outbound message sent to the {@code unreachable} nodes.
          *
          * @param unreachable the set of destination address which are not reachable
          */
-        private DropOutboundRequestsTo(Set<InetSocketAddress> unreachable) {
-            this.unreachable = unreachable;
+        private DropOutboundRequestsTo {
         }
 
         @Override
@@ -1123,12 +1108,7 @@
         }
     }
 
-    private static class MajorityReachedHighWatermark implements Invariant {
-        final Cluster cluster;
-
-        private MajorityReachedHighWatermark(Cluster cluster) {
-            this.cluster = cluster;
-        }
+    private record MajorityReachedHighWatermark(Cluster cluster) implements Invariant {
 
         @Override
         public void verify() {
@@ -1234,12 +1214,7 @@
         }
     }
 
-    private static class SnapshotAtLogStart implements Invariant {
-        final Cluster cluster;
-
-        private SnapshotAtLogStart(Cluster cluster) {
-            this.cluster = cluster;
-        }
+    private record SnapshotAtLogStart(Cluster cluster) implements Invariant {
 
         @Override
         public void verify() {
@@ -1280,12 +1255,7 @@
         }
     }
 
-    private static class LeaderNeverLoadSnapshot implements Invariant {
-        final Cluster cluster;
-
-        private LeaderNeverLoadSnapshot(Cluster cluster) {
-            this.cluster = cluster;
-        }
+    private record LeaderNeverLoadSnapshot(Cluster cluster) implements Invariant {
 
         @Override
         public void verify() {
@@ -1370,15 +1340,15 @@
             });
 
             for (LogBatch batch : log.readBatches(startOffset.get(), highWatermark)) {
-                if (batch.isControlBatch) {
+                if (batch.isControlBatch()) {
                     continue;
                 }
 
-                for (LogEntry entry : batch.entries) {
-                    long offset = entry.offset;
+                for (LogEntry entry : batch.entries()) {
+                    long offset = entry.offset();
                     assertTrue(offset < highWatermark.getAsLong());
 
-                    int sequence = parseSequenceNumber(entry.record.value().duplicate());
+                    int sequence = parseSequenceNumber(entry.record().value().duplicate());
                     committedSequenceNumbers.putIfAbsent(offset, sequence);
 
                     int committedSequence = committedSequenceNumbers.get(offset);
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 34b7c9f..cdbd728 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -660,32 +660,10 @@
         }
     }
 
-    private static class FetchRequestTestCase {
-        private final Uuid replicaDirectoryId;
-        private final short version;
-        private final short lastFetchedEpoch;
-        private final String expectedJson;
-
-        private FetchRequestTestCase(Uuid replicaDirectoryId, short version,
-                                     short lastFetchedEpoch, String expectedJson) {
-            this.replicaDirectoryId = replicaDirectoryId;
-            this.version = version;
-            this.lastFetchedEpoch = lastFetchedEpoch;
-            this.expectedJson = expectedJson;
-        }
+    private record FetchRequestTestCase(Uuid replicaDirectoryId, short version, short lastFetchedEpoch,
+                                        String expectedJson) {
     }
 
-    private static class FetchResponseTestCase {
-        private final short version;
-        private final int preferredReadReplicaId;
-        private final String expectedJson;
-
-        private FetchResponseTestCase(short version,
-                                      int preferredReadReplicaId,
-                                      String expectedJson) {
-            this.version = version;
-            this.preferredReadReplicaId = preferredReadReplicaId;
-            this.expectedJson = expectedJson;
-        }
+    private record FetchResponseTestCase(short version, int preferredReadReplicaId, String expectedJson) {
     }
 }