MINOR: Fix format in CoordinatorLoaderImpl (#20538)
The format of the code in `CoordinatorLoaderImpl` in inconsistent with
the rest of the code in the package. This small PR fixes it.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Sean
Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 078dad3..6613ce2 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -62,11 +62,11 @@
private final KafkaScheduler scheduler = new KafkaScheduler(1);
public CoordinatorLoaderImpl(
- Time time,
- Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
- Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
- Deserializer<T> deserializer,
- int loadBufferSize
+ Time time,
+ Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
+ Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
+ Deserializer<T> deserializer,
+ int loadBufferSize
) {
this.time = time;
this.partitionLogSupplier = partitionLogSupplier;
@@ -89,7 +89,7 @@
long startTimeMs = time.milliseconds();
try {
ScheduledFuture<?> result = scheduler.scheduleOnce(String.format("Load coordinator from %s", tp),
- () -> doLoad(tp, coordinator, future, startTimeMs));
+ () -> doLoad(tp, coordinator, future, startTimeMs));
if (result.isCancelled()) {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
}
@@ -100,17 +100,17 @@
}
private void doLoad(
- TopicPartition tp,
- CoordinatorPlayback<T> coordinator,
- CompletableFuture<LoadSummary> future,
- long startTimeMs
+ TopicPartition tp,
+ CoordinatorPlayback<T> coordinator,
+ CompletableFuture<LoadSummary> future,
+ long startTimeMs
) {
long schedulerQueueTimeMs = time.milliseconds() - startTimeMs;
try {
Optional<UnifiedLog> logOpt = partitionLogSupplier.apply(tp);
if (logOpt.isEmpty()) {
future.completeExceptionally(new NotLeaderOrFollowerException(
- "Could not load records from " + tp + " because the log does not exist."));
+ "Could not load records from " + tp + " because the log does not exist."));
return;
}
@@ -142,8 +142,7 @@
if (logEndOffset(tp) == -1L) {
future.completeExceptionally(new NotLeaderOrFollowerException(
- String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)
- ));
+ String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)));
} else if (isRunning.get()) {
future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
} else {
@@ -186,7 +185,7 @@
if (buffer.capacity() < bytesNeeded) {
if (loadBufferSize < bytesNeeded) {
LOG.warn("Loaded metadata from {} with buffer larger ({} bytes) than" +
- " configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
+ " configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
}
buffer = ByteBuffer.allocate(bytesNeeded);
@@ -202,15 +201,14 @@
}
private ReplayResult processMemoryRecords(
- TopicPartition tp,
- UnifiedLog log,
- MemoryRecords memoryRecords,
- CoordinatorPlayback<T> coordinator,
- LoadStats loadStats,
- long currentOffset,
- long previousHighWatermark
+ TopicPartition tp,
+ UnifiedLog log,
+ MemoryRecords memoryRecords,
+ CoordinatorPlayback<T> coordinator,
+ LoadStats loadStats,
+ long currentOffset,
+ long previousHighWatermark
) {
-
for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) {
for (Record record : batch) {
@@ -220,8 +218,8 @@
if (controlRecord == ControlRecordType.COMMIT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from {} at offset {} to commit" +
- " transaction with producer id {} and producer epoch {}.",
- tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ " transaction with producer id {} and producer epoch {}.",
+ tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
@@ -231,8 +229,8 @@
} else if (controlRecord == ControlRecordType.ABORT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from {} at offset {} to abort" +
- " transaction with producer id {} and producer epoch {}.",
- tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ " transaction with producer id {} and producer epoch {}.",
+ tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
@@ -250,7 +248,7 @@
coordinatorRecordOpt = Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
} catch (Deserializer.UnknownRecordTypeException ex) {
LOG.warn("Unknown record type {} while loading offsets and group metadata from {}." +
- " Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
+ " Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
} catch (RuntimeException ex) {
String msg = String.format("Deserializing record %s from %s failed.", record, tp);
LOG.error(msg, ex);
@@ -261,18 +259,18 @@
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying record {} from {} at offset {} with producer id {}" +
- " and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ " and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replay(
- record.offset(),
- batch.producerId(),
- batch.producerEpoch(),
- coordinatorRecord
+ record.offset(),
+ batch.producerId(),
+ batch.producerEpoch(),
+ coordinatorRecord
);
} catch (RuntimeException ex) {
String msg = String.format("Replaying record %s from %s at offset %d with producer id %d and" +
- " producer epoch %d failed.", coordinatorRecord, tp, record.offset(),
- batch.producerId(), batch.producerEpoch());
+ " producer epoch %d failed.", coordinatorRecord, tp, record.offset(),
+ batch.producerId(), batch.producerEpoch());
LOG.error(msg, ex);
throw new RuntimeException(msg, ex);
}
@@ -320,14 +318,13 @@
@Override
public String toString() {
- return "LoadStats{" +
- "numRecords=" + numRecords +
- ", numBytes=" + numBytes +
- ", readAtLeastOneRecord=" + readAtLeastOneRecord +
- '}';
+ return "LoadStats(" +
+ "numRecords=" + numRecords +
+ ", numBytes=" + numBytes +
+ ", readAtLeastOneRecord=" + readAtLeastOneRecord +
+ ')';
}
}
- private record ReplayResult(long nextOffset, long highWatermark) {
- }
+ private record ReplayResult(long nextOffset, long highWatermark) { }
}