MINOR: Fix format in CoordinatorLoaderImpl (#20538)

The format of the code in `CoordinatorLoaderImpl` in inconsistent with
the rest of the code in the package. This small PR fixes it.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Sean
 Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 078dad3..6613ce2 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -62,11 +62,11 @@
     private final KafkaScheduler scheduler = new KafkaScheduler(1);
 
     public CoordinatorLoaderImpl(
-            Time time,
-            Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
-            Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
-            Deserializer<T> deserializer,
-            int loadBufferSize
+        Time time,
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
+        Deserializer<T> deserializer,
+        int loadBufferSize
     ) {
         this.time = time;
         this.partitionLogSupplier = partitionLogSupplier;
@@ -89,7 +89,7 @@
         long startTimeMs = time.milliseconds();
         try {
             ScheduledFuture<?> result = scheduler.scheduleOnce(String.format("Load coordinator from %s", tp),
-                    () -> doLoad(tp, coordinator, future, startTimeMs));
+                () -> doLoad(tp, coordinator, future, startTimeMs));
             if (result.isCancelled()) {
                 future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
             }
@@ -100,17 +100,17 @@
     }
 
     private void doLoad(
-            TopicPartition tp,
-            CoordinatorPlayback<T> coordinator,
-            CompletableFuture<LoadSummary> future,
-            long startTimeMs
+        TopicPartition tp,
+        CoordinatorPlayback<T> coordinator,
+        CompletableFuture<LoadSummary> future,
+        long startTimeMs
     ) {
         long schedulerQueueTimeMs = time.milliseconds() - startTimeMs;
         try {
             Optional<UnifiedLog> logOpt = partitionLogSupplier.apply(tp);
             if (logOpt.isEmpty()) {
                 future.completeExceptionally(new NotLeaderOrFollowerException(
-                        "Could not load records from " + tp + " because the log does not exist."));
+                    "Could not load records from " + tp + " because the log does not exist."));
                 return;
             }
 
@@ -142,8 +142,7 @@
 
             if (logEndOffset(tp) == -1L) {
                 future.completeExceptionally(new NotLeaderOrFollowerException(
-                        String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)
-                ));
+                    String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)));
             } else if (isRunning.get()) {
                 future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
             } else {
@@ -186,7 +185,7 @@
             if (buffer.capacity() < bytesNeeded) {
                 if (loadBufferSize < bytesNeeded) {
                     LOG.warn("Loaded metadata from {} with buffer larger ({} bytes) than" +
-                            " configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
+                        " configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
                 }
 
                 buffer = ByteBuffer.allocate(bytesNeeded);
@@ -202,15 +201,14 @@
     }
 
     private ReplayResult processMemoryRecords(
-            TopicPartition tp,
-            UnifiedLog log,
-            MemoryRecords memoryRecords,
-            CoordinatorPlayback<T> coordinator,
-            LoadStats loadStats,
-            long currentOffset,
-            long previousHighWatermark
+        TopicPartition tp,
+        UnifiedLog log,
+        MemoryRecords memoryRecords,
+        CoordinatorPlayback<T> coordinator,
+        LoadStats loadStats,
+        long currentOffset,
+        long previousHighWatermark
     ) {
-
         for (MutableRecordBatch batch : memoryRecords.batches()) {
             if (batch.isControlBatch()) {
                 for (Record record : batch) {
@@ -220,8 +218,8 @@
                     if (controlRecord == ControlRecordType.COMMIT) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Replaying end transaction marker from {} at offset {} to commit" +
-                                            " transaction with producer id {} and producer epoch {}.",
-                                    tp, record.offset(), batch.producerId(), batch.producerEpoch());
+                                " transaction with producer id {} and producer epoch {}.",
+                                tp, record.offset(), batch.producerId(), batch.producerEpoch());
                         }
                         coordinator.replayEndTransactionMarker(
                                 batch.producerId(),
@@ -231,8 +229,8 @@
                     } else if (controlRecord == ControlRecordType.ABORT) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Replaying end transaction marker from {} at offset {} to abort" +
-                                            " transaction with producer id {} and producer epoch {}.",
-                                    tp, record.offset(), batch.producerId(), batch.producerEpoch());
+                                " transaction with producer id {} and producer epoch {}.",
+                                tp, record.offset(), batch.producerId(), batch.producerEpoch());
                         }
                         coordinator.replayEndTransactionMarker(
                                 batch.producerId(),
@@ -250,7 +248,7 @@
                         coordinatorRecordOpt = Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
                     } catch (Deserializer.UnknownRecordTypeException ex) {
                         LOG.warn("Unknown record type {} while loading offsets and group metadata from {}." +
-                                " Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
+                            " Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
                     } catch (RuntimeException ex) {
                         String msg = String.format("Deserializing record %s from %s failed.", record, tp);
                         LOG.error(msg, ex);
@@ -261,18 +259,18 @@
                         try {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Replaying record {} from {} at offset {} with producer id {}" +
-                                        " and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
+                                    " and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
                             }
                             coordinator.replay(
-                                    record.offset(),
-                                    batch.producerId(),
-                                    batch.producerEpoch(),
-                                    coordinatorRecord
+                                record.offset(),
+                                batch.producerId(),
+                                batch.producerEpoch(),
+                                coordinatorRecord
                             );
                         } catch (RuntimeException ex) {
                             String msg = String.format("Replaying record %s from %s at offset %d with producer id %d and" +
-                                            " producer epoch %d failed.", coordinatorRecord, tp, record.offset(),
-                                    batch.producerId(), batch.producerEpoch());
+                                " producer epoch %d failed.", coordinatorRecord, tp, record.offset(),
+                                batch.producerId(), batch.producerEpoch());
                             LOG.error(msg, ex);
                             throw new RuntimeException(msg, ex);
                         }
@@ -320,14 +318,13 @@
 
         @Override
         public String toString() {
-            return "LoadStats{" +
-                    "numRecords=" + numRecords +
-                    ", numBytes=" + numBytes +
-                    ", readAtLeastOneRecord=" + readAtLeastOneRecord +
-                    '}';
+            return "LoadStats(" +
+                "numRecords=" + numRecords +
+                ", numBytes=" + numBytes +
+                ", readAtLeastOneRecord=" + readAtLeastOneRecord +
+                ')';
         }
     }
 
-    private record ReplayResult(long nextOffset, long highWatermark) {
-    }
+    private record ReplayResult(long nextOffset, long highWatermark) { }
 }