MINOR: Fix format in CoordinatorLoaderImplTest (#20548)
Fix indentation in `CoordinatorLoaderImplTest` to be consistent with the
rest of the code in the package.
Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 8760e93..9f8ab68 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -72,8 +72,8 @@
@Override
public Map.Entry<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
return Map.entry(
- StandardCharsets.UTF_8.decode(key).toString(),
- StandardCharsets.UTF_8.decode(value).toString()
+ StandardCharsets.UTF_8.decode(key).toString(),
+ StandardCharsets.UTF_8.decode(value).toString()
);
}
}
@@ -87,11 +87,11 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
@@ -106,11 +106,11 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
loader.close();
assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
@@ -127,59 +127,59 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult1);
+ .thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
- new SimpleRecord("k3".getBytes(), "v3".getBytes()),
- new SimpleRecord("k4".getBytes(), "v4".getBytes()),
- new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult2);
+ .thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, 100L, (short) 5, Arrays.asList(
- new SimpleRecord("k6".getBytes(), "v6".getBytes()),
- new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult3);
+ .thenReturn(readResult3);
FetchDataInfo readResult4 = logReadResult(
- 7,
- 100L,
- (short) 5,
- ControlRecordType.COMMIT
+ 7,
+ 100L,
+ (short) 5,
+ ControlRecordType.COMMIT
);
when(log.read(7L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult4);
+ .thenReturn(readResult4);
FetchDataInfo readResult5 = logReadResult(
- 8,
- 500L,
- (short) 10,
- ControlRecordType.ABORT
+ 8,
+ 500L,
+ (short) 10,
+ ControlRecordType.ABORT
);
when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult5);
+ .thenReturn(readResult5);
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
assertNotNull(summary);
@@ -213,25 +213,25 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
CountDownLatch latch = new CountDownLatch(1);
when(log.read(
- anyLong(),
- eq(1000),
- eq(FetchIsolation.LOG_END),
- eq(true)
+ anyLong(),
+ eq(1000),
+ eq(FetchIsolation.LOG_END),
+ eq(true)
)).thenAnswer((InvocationOnMock invocation) -> {
latch.countDown();
return readResult;
@@ -258,25 +258,25 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult);
+ .thenReturn(readResult);
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
- .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
- .thenReturn(Map.entry("k2", "v2"));
+ .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
+ .thenReturn(Map.entry("k2", "v2"));
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
@@ -294,24 +294,24 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult);
+ .thenReturn(readResult);
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
- .thenThrow(new RuntimeException("Error!"));
+ .thenThrow(new RuntimeException("Error!"));
RuntimeException ex = assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
@@ -333,18 +333,18 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
FetchDataInfo readResult = logReadResult(0, List.of());
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult);
+ .thenReturn(readResult);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
}
@@ -361,34 +361,34 @@
MockTime time = new MockTime();
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- time,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ time,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
long startTimeMs = time.milliseconds();
when(log.logStartOffset()).thenReturn(0L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenAnswer((InvocationOnMock invocation) -> {
- time.sleep(1000);
- return readResult1;
- });
+ .thenAnswer((InvocationOnMock invocation) -> {
+ time.sleep(1000);
+ return readResult1;
+ });
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
- new SimpleRecord("k3".getBytes(), "v3".getBytes()),
- new SimpleRecord("k4".getBytes(), "v4".getBytes()),
- new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult2);
+ .thenReturn(readResult2);
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
assertEquals(startTimeMs, summary.startTimeMs());
@@ -408,39 +408,39 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
- new SimpleRecord("k3".getBytes(), "v3".getBytes()),
- new SimpleRecord("k4".getBytes(), "v4".getBytes()),
- new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult2);
+ .thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
- new SimpleRecord("k6".getBytes(), "v6".getBytes()),
- new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult3);
+ .thenReturn(readResult3);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
@@ -471,11 +471,11 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
@@ -497,39 +497,39 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult1);
+ .thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
- new SimpleRecord("k3".getBytes(), "v3".getBytes()),
- new SimpleRecord("k4".getBytes(), "v4".getBytes()),
- new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult2);
+ .thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
- new SimpleRecord("k6".getBytes(), "v6".getBytes()),
- new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult3);
+ .thenReturn(readResult3);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
@@ -561,32 +561,32 @@
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
- Time.SYSTEM,
- partitionLogSupplier,
- partitionLogEndOffsetSupplier,
- serde,
- 1000
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
- new SimpleRecord("k1".getBytes(), "v1".getBytes()),
- new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult1);
+ .thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
- new SimpleRecord("k3".getBytes(), "v3".getBytes()),
- new SimpleRecord("k4".getBytes(), "v4".getBytes()),
- new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
- .thenReturn(readResult2);
+ .thenReturn(readResult2);
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
@@ -597,28 +597,28 @@
}
private FetchDataInfo logReadResult(
- long startOffset,
- long producerId,
- short producerEpoch,
- List<SimpleRecord> records
+ long startOffset,
+ long producerId,
+ short producerEpoch,
+ List<SimpleRecord> records
) throws IOException {
FileRecords fileRecords = mock(FileRecords.class);
MemoryRecords memoryRecords;
if (producerId == RecordBatch.NO_PRODUCER_ID) {
memoryRecords = MemoryRecords.withRecords(
- startOffset,
- Compression.NONE,
- records.toArray(new SimpleRecord[0])
+ startOffset,
+ Compression.NONE,
+ records.toArray(new SimpleRecord[0])
);
} else {
memoryRecords = MemoryRecords.withTransactionalRecords(
- startOffset,
- Compression.NONE,
- producerId,
- producerEpoch,
- 0,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
- records.toArray(new SimpleRecord[0])
+ startOffset,
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ 0,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ records.toArray(new SimpleRecord[0])
);
}
@@ -635,19 +635,19 @@
}
private FetchDataInfo logReadResult(
- long startOffset,
- long producerId,
- short producerEpoch,
- ControlRecordType controlRecordType
+ long startOffset,
+ long producerId,
+ short producerEpoch,
+ ControlRecordType controlRecordType
) throws IOException {
FileRecords fileRecords = mock(FileRecords.class);
MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(
- startOffset,
- 0L,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
- producerId,
- producerEpoch,
- new EndTransactionMarker(controlRecordType, 0)
+ startOffset,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(controlRecordType, 0)
);
when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());