MINOR: Fix format in CoordinatorLoaderImplTest (#20548)

Fix indentation in `CoordinatorLoaderImplTest` to be consistent with the
rest of the code in the package.

Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 8760e93..9f8ab68 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -72,8 +72,8 @@
         @Override
         public Map.Entry<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
             return Map.entry(
-                    StandardCharsets.UTF_8.decode(key).toString(),
-                    StandardCharsets.UTF_8.decode(value).toString()
+                StandardCharsets.UTF_8.decode(key).toString(),
+                StandardCharsets.UTF_8.decode(value).toString()
             );
         }
     }
@@ -87,11 +87,11 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
         }
@@ -106,11 +106,11 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             loader.close();
             assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
@@ -127,59 +127,59 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult1);
+                .thenReturn(readResult1);
 
             FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
-                    new SimpleRecord("k3".getBytes(), "v3".getBytes()),
-                    new SimpleRecord("k4".getBytes(), "v4".getBytes()),
-                    new SimpleRecord("k5".getBytes(), "v5".getBytes())
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
             ));
 
             when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult2);
+                .thenReturn(readResult2);
 
             FetchDataInfo readResult3 = logReadResult(5, 100L, (short) 5, Arrays.asList(
-                    new SimpleRecord("k6".getBytes(), "v6".getBytes()),
-                    new SimpleRecord("k7".getBytes(), "v7".getBytes())
+                new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+                new SimpleRecord("k7".getBytes(), "v7".getBytes())
             ));
 
             when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult3);
+                .thenReturn(readResult3);
 
             FetchDataInfo readResult4 = logReadResult(
-                    7,
-                    100L,
-                    (short) 5,
-                    ControlRecordType.COMMIT
+                7,
+                100L,
+                (short) 5,
+                ControlRecordType.COMMIT
             );
 
             when(log.read(7L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult4);
+                .thenReturn(readResult4);
 
             FetchDataInfo readResult5 = logReadResult(
-                    8,
-                    500L,
-                    (short) 10,
-                    ControlRecordType.ABORT
+                8,
+                500L,
+                (short) 10,
+                ControlRecordType.ABORT
             );
 
             when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult5);
+                .thenReturn(readResult5);
 
             CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
             assertNotNull(summary);
@@ -213,25 +213,25 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             CountDownLatch latch = new CountDownLatch(1);
             when(log.read(
-                    anyLong(),
-                    eq(1000),
-                    eq(FetchIsolation.LOG_END),
-                    eq(true)
+                anyLong(),
+                eq(1000),
+                eq(FetchIsolation.LOG_END),
+                eq(true)
             )).thenAnswer((InvocationOnMock invocation) -> {
                 latch.countDown();
                 return readResult;
@@ -258,25 +258,25 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult);
+                .thenReturn(readResult);
 
             when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
-                    .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
-                    .thenReturn(Map.entry("k2", "v2"));
+                .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
+                .thenReturn(Map.entry("k2", "v2"));
 
             loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
 
@@ -294,24 +294,24 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult);
+                .thenReturn(readResult);
 
             when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
-                    .thenThrow(new RuntimeException("Error!"));
+                .thenThrow(new RuntimeException("Error!"));
 
             RuntimeException ex = assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
 
@@ -333,18 +333,18 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
             FetchDataInfo readResult = logReadResult(0, List.of());
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult);
+                .thenReturn(readResult);
 
             assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
         }
@@ -361,34 +361,34 @@
         MockTime time = new MockTime();
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                time,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            time,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             long startTimeMs = time.milliseconds();
             when(log.logStartOffset()).thenReturn(0L);
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenAnswer((InvocationOnMock invocation) -> {
-                        time.sleep(1000);
-                        return readResult1;
-                    });
+                .thenAnswer((InvocationOnMock invocation) -> {
+                    time.sleep(1000);
+                    return readResult1;
+                });
 
             FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
-                    new SimpleRecord("k3".getBytes(), "v3".getBytes()),
-                    new SimpleRecord("k4".getBytes(), "v4".getBytes()),
-                    new SimpleRecord("k5".getBytes(), "v5".getBytes())
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
             ));
 
             when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult2);
+                .thenReturn(readResult2);
 
             CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
             assertEquals(startTimeMs, summary.startTimeMs());
@@ -408,39 +408,39 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L, 0L, 2L);
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
                     .thenReturn(readResult1);
 
             FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
-                    new SimpleRecord("k3".getBytes(), "v3".getBytes()),
-                    new SimpleRecord("k4".getBytes(), "v4".getBytes()),
-                    new SimpleRecord("k5".getBytes(), "v5".getBytes())
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
             ));
 
             when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult2);
+                .thenReturn(readResult2);
 
             FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
-                    new SimpleRecord("k6".getBytes(), "v6".getBytes()),
-                    new SimpleRecord("k7".getBytes(), "v7".getBytes())
+                new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+                new SimpleRecord("k7".getBytes(), "v7".getBytes())
             ));
 
             when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult3);
+                .thenReturn(readResult3);
 
             assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
 
@@ -471,11 +471,11 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
@@ -497,39 +497,39 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(5L, 7L, 7L);
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult1);
+                .thenReturn(readResult1);
 
             FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
-                    new SimpleRecord("k3".getBytes(), "v3".getBytes()),
-                    new SimpleRecord("k4".getBytes(), "v4".getBytes()),
-                    new SimpleRecord("k5".getBytes(), "v5".getBytes())
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
             ));
 
             when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult2);
+                .thenReturn(readResult2);
 
             FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
-                    new SimpleRecord("k6".getBytes(), "v6".getBytes()),
-                    new SimpleRecord("k7".getBytes(), "v7".getBytes())
+                new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+                new SimpleRecord("k7".getBytes(), "v7".getBytes())
             ));
 
             when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult3);
+                .thenReturn(readResult3);
 
             assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
 
@@ -561,32 +561,32 @@
         CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
-                Time.SYSTEM,
-                partitionLogSupplier,
-                partitionLogEndOffsetSupplier,
-                serde,
-                1000
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
             when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
-                    new SimpleRecord("k1".getBytes(), "v1".getBytes()),
-                    new SimpleRecord("k2".getBytes(), "v2".getBytes())
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
             ));
 
             when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult1);
+                .thenReturn(readResult1);
 
             FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
-                    new SimpleRecord("k3".getBytes(), "v3".getBytes()),
-                    new SimpleRecord("k4".getBytes(), "v4".getBytes()),
-                    new SimpleRecord("k5".getBytes(), "v5".getBytes())
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
             ));
 
             when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
-                    .thenReturn(readResult2);
+                .thenReturn(readResult2);
 
             assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
         }
@@ -597,28 +597,28 @@
     }
 
     private FetchDataInfo logReadResult(
-            long startOffset,
-            long producerId,
-            short producerEpoch,
-            List<SimpleRecord> records
+        long startOffset,
+        long producerId,
+        short producerEpoch,
+        List<SimpleRecord> records
     ) throws IOException {
         FileRecords fileRecords = mock(FileRecords.class);
         MemoryRecords memoryRecords;
         if (producerId == RecordBatch.NO_PRODUCER_ID) {
             memoryRecords = MemoryRecords.withRecords(
-                    startOffset,
-                    Compression.NONE,
-                    records.toArray(new SimpleRecord[0])
+                startOffset,
+                Compression.NONE,
+                records.toArray(new SimpleRecord[0])
             );
         } else {
             memoryRecords = MemoryRecords.withTransactionalRecords(
-                    startOffset,
-                    Compression.NONE,
-                    producerId,
-                    producerEpoch,
-                    0,
-                    RecordBatch.NO_PARTITION_LEADER_EPOCH,
-                    records.toArray(new SimpleRecord[0])
+                startOffset,
+                Compression.NONE,
+                producerId,
+                producerEpoch,
+                0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                records.toArray(new SimpleRecord[0])
             );
         }
 
@@ -635,19 +635,19 @@
     }
 
     private FetchDataInfo logReadResult(
-            long startOffset,
-            long producerId,
-            short producerEpoch,
-            ControlRecordType controlRecordType
+        long startOffset,
+        long producerId,
+        short producerEpoch,
+        ControlRecordType controlRecordType
     ) throws IOException {
         FileRecords fileRecords = mock(FileRecords.class);
         MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(
-                startOffset,
-                0L,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH,
-                producerId,
-                producerEpoch,
-                new EndTransactionMarker(controlRecordType, 0)
+            startOffset,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH,
+            producerId,
+            producerEpoch,
+            new EndTransactionMarker(controlRecordType, 0)
         );
 
         when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());