blob: db974195493a0623f0bafabce80131b45c05c0b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, AutoCloseable {
interface LocalBatch {
int epoch();
int size();
}
public static class LeaderChangeBatch implements LocalBatch {
private final LeaderAndEpoch newLeader;
public LeaderChangeBatch(LeaderAndEpoch newLeader) {
this.newLeader = newLeader;
}
@Override
public int epoch() {
return newLeader.epoch();
}
@Override
public int size() {
return 1;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LeaderChangeBatch other)) return false;
return other.newLeader.equals(newLeader);
}
@Override
public int hashCode() {
return Objects.hash(newLeader);
}
@Override
public String toString() {
return "LeaderChangeBatch(newLeader=" + newLeader + ")";
}
}
public static class LocalRecordBatch implements LocalBatch {
private final int leaderEpoch;
private final long appendTimestamp;
private final List<ApiMessageAndVersion> records;
public LocalRecordBatch(int leaderEpoch, long appendTimestamp, List<ApiMessageAndVersion> records) {
this.leaderEpoch = leaderEpoch;
this.appendTimestamp = appendTimestamp;
this.records = records;
}
@Override
public int epoch() {
return leaderEpoch;
}
@Override
public int size() {
return records.size();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalRecordBatch other)) return false;
return leaderEpoch == other.leaderEpoch &&
appendTimestamp == other.appendTimestamp &&
Objects.equals(records, other.records);
}
@Override
public int hashCode() {
return Objects.hash(leaderEpoch, appendTimestamp, records);
}
@Override
public String toString() {
return String.format(
"LocalRecordBatch(leaderEpoch=%s, appendTimestamp=%s, records=%s)",
leaderEpoch,
appendTimestamp,
records
);
}
}
public static class SharedLogData {
private static final Logger log = LoggerFactory.getLogger(SharedLogData.class);
/**
* Maps node IDs to the matching log managers.
*/
private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
/**
* Maps offsets to record batches.
*/
private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
/**
* Maps committed offset to snapshot reader.
*/
private final NavigableMap<Long, RawSnapshotReader> snapshots = new TreeMap<>();
/**
* The current leader.
*/
private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
/**
* The start offset of the last batch that was created, or -1 if no batches have
* been created.
*/
private long prevOffset;
/**
* The initial max read offset which LocalLog instances will be configured with.
*/
private long initialMaxReadOffset = Long.MAX_VALUE;
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
if (snapshot.isPresent()) {
RawSnapshotReader initialSnapshot = snapshot.get();
prevOffset = initialSnapshot.snapshotId().offset() - 1;
snapshots.put(prevOffset, initialSnapshot);
} else {
prevOffset = -1;
}
}
synchronized void registerLogManager(LocalLogManager logManager) {
if (logManagers.put(logManager.nodeId, logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
electLeaderIfNeeded();
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
if (!logManagers.remove(logManager.nodeId, logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
}
synchronized long tryAppend(
int nodeId,
int epoch,
List<ApiMessageAndVersion> batch
) {
// No easy access to the concept of time. Use the base offset as the append timestamp
long appendTimestamp = (prevOffset + 1) * 10;
return tryAppend(nodeId,
epoch,
new LocalRecordBatch(epoch, appendTimestamp, batch));
}
synchronized long tryAppend(
int nodeId,
int epoch,
LocalBatch batch
) {
if (!leader.isLeader(nodeId)) {
log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " +
"match the current leader id of {}.", nodeId, epoch, leader.leaderId());
throw new NotLeaderException("Append failed because the replication is not the current leader");
}
if (epoch < leader.epoch()) {
throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
"Current leader epoch = " + leader.epoch());
} else if (epoch > leader.epoch()) {
throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
" which is larger than the current epoch " + leader.epoch());
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
long offset = append(batch);
electLeaderIfNeeded();
return offset;
}
public synchronized long append(
LocalBatch batch
) {
long nextEndOffset = prevOffset + batch.size();
log.debug("append(batch={}, nextEndOffset={})", batch, nextEndOffset);
batches.put(nextEndOffset, batch);
if (batch instanceof LeaderChangeBatch leaderChangeBatch) {
leader = leaderChangeBatch.newLeader;
}
for (LocalLogManager logManager : logManagers.values()) {
logManager.scheduleLogCheck();
}
prevOffset = nextEndOffset;
return nextEndOffset;
}
synchronized void electLeaderIfNeeded() {
if (leader.leaderId().isPresent() || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
Iterator<Integer> iter = logManagers.keySet().iterator();
Integer nextLeaderNode = null;
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
synchronized LeaderAndEpoch leaderAndEpoch() {
return leader;
}
synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
if (entry == null) {
return null;
}
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
}
/**
* Optionally return a snapshot reader if the offset if less than the first batch.
*/
synchronized Optional<RawSnapshotReader> nextSnapshot(long offset) {
return Optional.ofNullable(snapshots.lastEntry()).flatMap(entry -> {
if (offset <= entry.getKey()) {
return Optional.of(entry.getValue());
}
return Optional.empty();
});
}
/**
* Stores a new snapshot and notifies all threads waiting for a snapshot.
*/
synchronized void addSnapshot(RawSnapshotReader newSnapshot) {
if (newSnapshot.snapshotId().offset() - 1 > prevOffset) {
log.error(
"Ignored attempt to add a snapshot {} that is greater than the latest offset {}",
newSnapshot,
prevOffset
);
} else {
snapshots.put(newSnapshot.snapshotId().offset() - 1, newSnapshot);
this.notifyAll();
}
}
/**
* Returns the snapshot whose last offset is the committed offset.
*
* If such snapshot doesn't exist, it waits until it does.
*/
synchronized RawSnapshotReader waitForSnapshot(long committedOffset) throws InterruptedException {
while (true) {
RawSnapshotReader reader = snapshots.get(committedOffset);
if (reader != null) {
return reader;
} else {
this.wait();
}
}
}
/**
* Returns the latest snapshot.
*
* If a snapshot doesn't exists, it waits until it does.
*/
synchronized RawSnapshotReader waitForLatestSnapshot() throws InterruptedException {
while (snapshots.isEmpty()) {
this.wait();
}
return Objects.requireNonNull(snapshots.lastEntry()).getValue();
}
/**
* Returns the snapshot id of the latest snapshot if there is one.
*
* If a snapshot doesn't exists, it return an empty Optional.
*/
synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
return Optional.ofNullable(snapshots.lastEntry()).map(entry -> entry.getValue().snapshotId());
}
synchronized long appendedBytes() {
ObjectSerializationCache objectCache = new ObjectSerializationCache();
return batches
.values()
.stream()
.flatMapToInt(batch -> {
if (batch instanceof LocalRecordBatch localBatch) {
return localBatch.records.stream().mapToInt(record -> messageSize(record, objectCache));
} else {
return IntStream.empty();
}
})
.sum();
}
public SharedLogData setInitialMaxReadOffset(long initialMaxReadOffset) {
this.initialMaxReadOffset = initialMaxReadOffset;
return this;
}
public long initialMaxReadOffset() {
return initialMaxReadOffset;
}
/**
* Return all records in the log as a list.
*/
public synchronized List<ApiMessageAndVersion> allRecords() {
List<ApiMessageAndVersion> allRecords = new ArrayList<>();
for (LocalBatch batch : batches.values()) {
if (batch instanceof LocalRecordBatch recordBatch) {
allRecords.addAll(recordBatch.records);
}
}
return allRecords;
}
}
private static class MetaLogListenerData {
private long offset = -1;
private LeaderAndEpoch notifiedLeader = new LeaderAndEpoch(OptionalInt.empty(), 0);
private final RaftClient.Listener<ApiMessageAndVersion> listener;
MetaLogListenerData(RaftClient.Listener<ApiMessageAndVersion> listener) {
this.listener = listener;
}
long offset() {
return offset;
}
void setOffset(long offset) {
this.offset = offset;
}
LeaderAndEpoch notifiedLeader() {
return notifiedLeader;
}
void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) {
listener.handleCommit(reader);
offset = reader.lastOffset().getAsLong();
}
void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
listener.handleLoadSnapshot(reader);
offset = reader.lastContainedLogOffset();
}
void handleLeaderChange(long offset, LeaderAndEpoch leader) {
// Simulate KRaft implementation by first bumping the epoch before assigning a leader
listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch()));
listener.handleLeaderChange(leader);
notifiedLeader = leader;
this.offset = offset;
}
void beginShutdown() {
listener.beginShutdown();
}
}
private final Logger log;
/**
* The node ID of this local log manager. Each log manager must have a unique ID.
*/
private final int nodeId;
/**
* A reference to the in-memory state that unites all the log managers in use.
*/
private final SharedLogData shared;
/**
* The event queue used by this local log manager.
*/
private final EventQueue eventQueue;
/**
* The latest kraft version used by this local log manager.
*/
private final KRaftVersion lastKRaftVersion;
/**
* Whether this LocalLogManager has been shut down.
*/
private boolean shutdown = false;
/**
* An offset that the log manager will not read beyond. This exists only for testing
* purposes.
*/
private long maxReadOffset;
/**
* The listener objects attached to this local log manager.
*/
private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData> listeners = new IdentityHashMap<>();
/**
* The current leader, as seen by this log manager.
*/
private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
/*
* If this variable is true the next scheduleAppend will fail
*/
private final AtomicBoolean throwOnNextAppend = new AtomicBoolean(false);
public LocalLogManager(LogContext logContext,
int nodeId,
SharedLogData shared,
String threadNamePrefix,
KRaftVersion lastKRaftVersion) {
this.log = logContext.logger(LocalLogManager.class);
this.nodeId = nodeId;
this.shared = shared;
this.maxReadOffset = shared.initialMaxReadOffset();
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix, new ShutdownEvent());
this.lastKRaftVersion = lastKRaftVersion;
shared.registerLogManager(this);
}
private void scheduleLogCheck() {
eventQueue.append(() -> {
try {
log.debug("Node {}: running log check.", nodeId);
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners.values()) {
while (true) {
// Load the snapshot if needed and we are not the leader
LeaderAndEpoch notifiedLeader = listenerData.notifiedLeader();
if (!OptionalInt.of(nodeId).equals(notifiedLeader.leaderId())) {
Optional<RawSnapshotReader> snapshot = shared.nextSnapshot(listenerData.offset());
if (snapshot.isPresent()) {
log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
listenerData.handleLoadSnapshot(
RecordsSnapshotReader.of(
snapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),
Integer.MAX_VALUE,
true
)
);
}
}
Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset());
if (entry == null) {
log.trace("Node {}: reached the end of the log after finding " +
"{} entries.", nodeId, numEntriesFound);
break;
}
long entryOffset = entry.getKey();
if (entryOffset > maxReadOffset) {
log.trace("Node {}: after {} entries, not reading the next " +
"entry because its offset is {}, and maxReadOffset is {}.",
nodeId, numEntriesFound, entryOffset, maxReadOffset);
break;
}
if (entry.getValue() instanceof LeaderChangeBatch batch) {
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
// Only notify the listener if it equals the shared leader state
LeaderAndEpoch sharedLeader = shared.leaderAndEpoch();
if (batch.newLeader.equals(sharedLeader)) {
log.debug("Node {}: Executing handleLeaderChange {}",
nodeId, sharedLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
}
listenerData.handleLeaderChange(entryOffset, batch.newLeader);
} else {
log.debug("Node {}: Ignoring {} since it doesn't match the latest known leader {}",
nodeId, batch.newLeader, sharedLeader);
listenerData.setOffset(entryOffset);
}
} else if (entry.getValue() instanceof LocalRecordBatch batch) {
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
ObjectSerializationCache objectCache = new ObjectSerializationCache();
listenerData.handleCommit(
MemoryBatchReader.of(
Collections.singletonList(
Batch.data(
entryOffset - batch.records.size() + 1,
batch.leaderEpoch,
batch.appendTimestamp,
batch
.records
.stream()
.mapToInt(record -> messageSize(record, objectCache))
.sum(),
batch.records
)
),
reader -> { }
)
);
}
numEntriesFound++;
}
}
log.trace("Completed log check for node " + nodeId);
} catch (Exception e) {
log.error("Exception while handling log check", e);
}
});
}
private static int messageSize(ApiMessageAndVersion messageAndVersion, ObjectSerializationCache objectCache) {
return new MetadataRecordSerde().recordSize(messageAndVersion, objectCache);
}
public void beginShutdown() {
eventQueue.beginShutdown("beginShutdown");
}
class ShutdownEvent implements EventQueue.Event {
@Override
public void run() throws Exception {
try {
if (!shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
resign(leader.epoch());
for (MetaLogListenerData listenerData : listeners.values()) {
listenerData.beginShutdown();
}
shared.unregisterLogManager(LocalLogManager.this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown callbacks", e);
}
shutdown = true;
}
}
@Override
public void close() {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
try {
eventQueue.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
/**
* Shutdown the log manager.
*
* Even though the API suggests a non-blocking shutdown, this method always returns a completed
* future. This means that shutdown is a blocking operation.
*/
@Override
public CompletableFuture<Void> shutdown(int timeoutMs) {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
try {
close();
shutdownFuture.complete(null);
} catch (Throwable t) {
shutdownFuture.completeExceptionally(t);
}
return shutdownFuture;
}
@Override
public void register(RaftClient.Listener<ApiMessageAndVersion> listener) {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
log.info("Node {}: can't register because local log manager has " +
"already been shut down.", nodeId);
future.complete(null);
} else {
int id = System.identityHashCode(listener);
if (listeners.putIfAbsent(listener, new MetaLogListenerData(listener)) != null) {
log.error("Node {}: can't register because listener {} already exists", nodeId, id);
} else {
log.info("Node {}: registered MetaLogListener {}", nodeId, id);
}
shared.electLeaderIfNeeded();
scheduleLogCheck();
future.complete(null);
}
});
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
eventQueue.append(() -> {
if (shutdown) {
log.info("Node {}: can't unregister because local log manager is shutdown", nodeId);
} else {
int id = System.identityHashCode(listener);
if (listeners.remove(listener) == null) {
log.error("Node {}: can't unregister because the listener {} doesn't exists", nodeId, id);
} else {
log.info("Node {}: unregistered MetaLogListener {}", nodeId, id);
}
}
});
}
@Override
public synchronized OptionalLong highWatermark() {
if (shared.prevOffset > 0) {
return OptionalLong.of(shared.prevOffset);
} else {
return OptionalLong.empty();
}
}
@Override
public long prepareAppend(
int epoch,
List<ApiMessageAndVersion> batch
) {
if (batch.isEmpty()) {
throw new IllegalArgumentException("Batch cannot be empty");
}
if (throwOnNextAppend.getAndSet(false)) {
throw new BufferAllocationException("Test asked to fail the next prepareAppend");
}
return shared.tryAppend(nodeId, epoch, batch);
}
@Override
public void schedulePreparedAppend() { }
@Override
public void resign(int epoch) {
if (epoch < 0) {
throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
}
LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
int currentEpoch = leaderAndEpoch.epoch();
if (epoch > currentEpoch) {
throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
" which is larger than the current epoch " + currentEpoch);
} else if (epoch < currentEpoch) {
// If the passed epoch is smaller than the current epoch, then it might mean
// that the listener has not been notified about a leader change that already
// took place. In this case, we consider the call as already fulfilled and
// take no further action.
log.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
"current epoch {}", epoch, currentEpoch);
return;
}
LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1);
try {
shared.tryAppend(nodeId,
currentEpoch,
new LeaderChangeBatch(nextLeader));
} catch (NotLeaderException exp) {
// the leader epoch has already advanced. resign is a no op.
log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " +
"smaller than the current epoch {}", epoch, currentEpoch);
}
}
@Override
public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
OffsetAndEpoch snapshotId,
long lastContainedLogTimestamp
) {
return Optional.of(
new RecordsSnapshotWriter.Builder()
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setTime(new MockTime())
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
.build(new MetadataRecordSerde())
);
}
private RawSnapshotWriter createNewSnapshot(OffsetAndEpoch snapshotId) {
return new MockRawSnapshotWriter(
snapshotId,
buffer -> shared.addSnapshot(new MockRawSnapshotReader(snapshotId, buffer))
);
}
@Override
public synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
return shared.latestSnapshotId();
}
@Override
public synchronized long logEndOffset() {
return shared.prevOffset + 1;
}
@Override
public LeaderAndEpoch leaderAndEpoch() {
return leader;
}
@Override
public OptionalInt nodeId() {
return OptionalInt.of(nodeId);
}
public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
final CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new CompletableFuture<>();
eventQueue.append(() ->
future.complete(listeners.values().stream().map(l -> l.listener).collect(Collectors.toList()))
);
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public void setMaxReadOffset(long maxReadOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
this.maxReadOffset = maxReadOffset;
scheduleLogCheck();
future.complete(null);
});
try {
future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public void throwOnNextAppend() {
throwOnNextAppend.set(true);
}
@Override
public KRaftVersion kraftVersion() {
return lastKRaftVersion;
}
}