RATIS-609. Change RaftLog to use RaftGroupMemberId.
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 2641b4a..15545e1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -19,11 +19,6 @@
public class StateMachineException extends RaftException {
public StateMachineException(RaftGroupMemberId serverId, Throwable cause) {
- this(serverId.getPeerId(), cause);
- }
-
- // TODO: remove this constructor in RATIS-609
- public StateMachineException(RaftPeerId serverId, Throwable cause) {
// cause.getMessage is added to this exception message as the exception received through
// RPC call contains similar message but Simulated RPC doesn't. Adding the message
// from cause to this exception makes it consistent across simulated and other RPC implementations.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6b3b626..35daf5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1281,7 +1281,13 @@
return null;
}
- public void failClientRequest(LogEntryProto logEntry) {
+ /**
+ * The given log entry is being truncated.
+ * Fail the corresponding client request, if there is any.
+ *
+ * @param logEntry the log entry being truncated
+ */
+ public void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry();
final ClientId clientId = ClientId.valueOf(smLog.getClientId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 96e0d83..05cda84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -117,7 +117,7 @@
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
- log = initLog(id, prop, lastApplied, this::setRaftConf);
+ this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, lastApplied, prop);
RaftLog.Metadata metadata = log.loadMetadata();
currentTerm.set(metadata.getTerm());
@@ -179,18 +179,13 @@
stateMachineUpdater.start();
}
- /**
- * note we do not apply log entries to the state machine here since we do not
- * know whether they have been committed.
- */
- private RaftLog initLog(RaftPeerId id, RaftProperties prop,
- long lastIndexInSnapshot, Consumer<LogEntryProto> logConsumer)
- throws IOException {
+ private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage,
+ Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot, RaftProperties prop) throws IOException {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
- log = new MemoryRaftLog(id, lastIndexInSnapshot, prop);
+ log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
} else {
- log = new SegmentedRaftLog(id, server, this.storage, lastIndexInSnapshot, prop);
+ log = new SegmentedRaftLog(memberId, server, storage, lastIndexInSnapshot, prop);
}
log.open(lastIndexInSnapshot, logConsumer);
return log;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 01474fd..13a05c5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -19,12 +19,11 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftConfiguration;
-import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.TransactionContext;
@@ -59,13 +58,14 @@
public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync";
- private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getSelfId(), s);
- private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getSelfId(), s);
+ private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
+ private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
/** The least valid log index, i.e. the index used when writing to an empty log. */
public static final long LEAST_VALID_LOG_INDEX = 0L;
public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
+ private final String name;
/**
* The largest committed index. Note the last committed log may be included
* in the latest snapshot file.
@@ -74,7 +74,7 @@
private final RaftLogIndex purgeIndex;
private final int purgeGap;
- private final RaftPeerId selfId;
+ private final RaftGroupMemberId memberId;
private final int maxBufferSize;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -83,8 +83,9 @@
private volatile LogEntryProto lastMetadataEntry = null;
- protected RaftLog(RaftPeerId selfId, long commitIndex, RaftProperties properties) {
- this.selfId = selfId;
+ protected RaftLog(RaftGroupMemberId memberId, long commitIndex, RaftProperties properties) {
+ this.name = memberId + "-" + getClass().getSimpleName();
+ this.memberId = memberId;
this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
@@ -168,7 +169,7 @@
try {
operation = operation.preAppendTransaction();
} catch (IOException e) {
- throw new StateMachineException(selfId, e);
+ throw new StateMachineException(memberId, e);
}
// build the log entry after calling the StateMachine
@@ -176,9 +177,8 @@
int entrySize = e.getSerializedSize();
if (entrySize > maxBufferSize) {
- throw new StateMachineException(selfId, new RaftLogIOException(
- "Log entry size " + entrySize + " exceeds the max buffer limit of "
- + maxBufferSize));
+ throw new StateMachineException(memberId, new RaftLogIOException(
+ "Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize));
}
appendEntry(e);
return nextIndex;
@@ -193,7 +193,7 @@
private long appendMetadataImpl(long term, long newCommitIndex) {
checkLogState();
if (!shouldAppendMetadata(newCommitIndex)) {
- return RaftServerConstants.INVALID_LOG_INDEX;
+ return INVALID_LOG_INDEX;
}
final LogEntryProto entry;
@@ -437,12 +437,8 @@
state.close();
}
- public RaftPeerId getSelfId() {
- return selfId;
- }
-
public String getName() {
- return selfId + "-" + getClass().getSimpleName();
+ return name;
}
/**
@@ -477,17 +473,17 @@
} catch (TimeoutException t) {
throw t;
} catch (Throwable t) {
- final String err = selfId + ": Failed readStateMachineData for " +
+ final String err = getName() + ": Failed readStateMachineData for " +
ServerProtoUtils.toLogEntryString(logEntry);
- LogAppender.LOG.error(err, t);
+ LOG.error(err, t);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (ServerProtoUtils.shouldReadStateMachineData(entryProto)) {
- final String err = selfId + ": State machine data not set for " +
+ final String err = getName() + ": State machine data not set for " +
ServerProtoUtils.toLogEntryString(logEntry);
- LogAppender.LOG.error(err);
+ LOG.error(err);
throw new RaftLogIOException(err);
}
return entryProto;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index f07aa1a..1799fae 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -18,8 +18,8 @@
package org.apache.ratis.server.raftlog.memory;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -72,8 +72,8 @@
private final EntryList entries = new EntryList();
private final AtomicReference<Metadata> metadata = new AtomicReference<>(new Metadata(null, 0));
- public MemoryRaftLog(RaftPeerId selfId, long commitIndex, RaftProperties properties) {
- super(selfId, commitIndex, properties);
+ public MemoryRaftLog(RaftGroupMemberId memberId, long commitIndex, RaftProperties properties) {
+ super(memberId, commitIndex, properties);
}
@Override
@@ -154,7 +154,7 @@
@Override
public long getStartIndex() {
- return entries.size() == 0? RaftServerConstants.INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex();
+ return entries.size() == 0? INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex();
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 96330d6..fbe8529 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -31,6 +32,7 @@
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -110,29 +112,89 @@
}
}
- private final Optional<RaftServerImpl> server;
+ /** The methods defined in {@link RaftServerImpl} which are used in {@link SegmentedRaftLog}. */
+ interface ServerLogMethods {
+ ServerLogMethods DUMMY = new ServerLogMethods() {};
+
+ default boolean shouldEvictCache() {
+ return false;
+ }
+
+ default long[] getFollowerNextIndices() {
+ return null;
+ }
+
+ default long getLastAppliedIndex() {
+ return INVALID_LOG_INDEX;
+ }
+
+ /** Notify the server that a log entry is being truncated. */
+ default void notifyTruncatedLogEntry(TermIndex ti) {
+ }
+ }
+
+ /**
+ * When the server is null, return the dummy instance of {@link ServerLogMethods}.
+ * Otherwise, the server is non-null, return the implementation using the given server.
+ */
+ private ServerLogMethods newServerLogMethods(RaftServerImpl impl) {
+ if (impl == null) {
+ return ServerLogMethods.DUMMY;
+ }
+
+ return new ServerLogMethods() {
+ @Override
+ public boolean shouldEvictCache() {
+ return cache.shouldEvict();
+ }
+
+ @Override
+ public long[] getFollowerNextIndices() {
+ return impl.getFollowerNextIndices();
+ }
+
+ @Override
+ public long getLastAppliedIndex() {
+ return impl.getState().getLastAppliedIndex();
+ }
+
+ @Override
+ public void notifyTruncatedLogEntry(TermIndex ti) {
+ try {
+ final LogEntryProto entry = get(ti.getIndex());
+ impl.notifyTruncatedLogEntry(entry);
+ } catch (RaftLogIOException e) {
+ LOG.error("{}: Failed to read log {}", getName(), ti, e);
+ }
+ }
+ };
+ }
+
+ private final ServerLogMethods server;
private final RaftStorage storage;
+ private final StateMachine stateMachine;
private final SegmentedRaftLogCache cache;
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
- public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
+ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
- this(selfId, server, server != null? server.getStateMachine(): null,
+ this(memberId, server, server != null? server.getStateMachine(): null,
server != null? server::submitUpdateCommitEvent: null,
storage, lastIndexInSnapshot, properties);
}
- SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
+ SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
- super(selfId, lastIndexInSnapshot, properties);
- this.server = Optional.ofNullable(server);
+ super(memberId, lastIndexInSnapshot, properties);
+ this.server = newServerLogMethods(server);
this.storage = storage;
+ this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- this.cache = new SegmentedRaftLogCache(selfId, storage, properties);
- this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
+ this.cache = new SegmentedRaftLogCache(memberId, storage, properties);
+ this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
submitUpdateCommitEvent, server, storage, properties);
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
@@ -221,9 +283,10 @@
}
try {
- return new EntryWithData(entry, server.map(s -> s.getStateMachine().readStateMachineData(entry)).orElse(null));
+ final CompletableFuture<ByteString> future = stateMachine != null? stateMachine.readStateMachineData(entry): null;
+ return new EntryWithData(entry, future);
} catch (Throwable e) {
- final String err = getSelfId() + ": Failed readStateMachineData for " +
+ final String err = getName() + ": Failed readStateMachineData for " +
ServerProtoUtils.toLogEntryString(entry);
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
@@ -231,12 +294,11 @@
}
private void checkAndEvictCache() {
- if (server.isPresent() && cache.shouldEvict()) {
+ if (server.shouldEvictCache()) {
// TODO if the cache is hitting the maximum size and we cannot evict any
// segment's cache, should block the new entry appending or new segment
// allocation.
- final RaftServerImpl s = server.get();
- cache.evictCache(s.getFollowerNextIndices(), fileLogWorker.getFlushIndex(), s.getState().getLastAppliedIndex());
+ cache.evictCache(server.getFollowerNextIndices(), fileLogWorker.getFlushIndex(), server.getLastAppliedIndex());
}
}
@@ -296,8 +358,7 @@
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
checkLogState();
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: appendEntry {}", getSelfId(),
- ServerProtoUtils.toLogEntryString(entry));
+ LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
}
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
@@ -334,7 +395,7 @@
}
return writeFuture;
} catch (Throwable throwable) {
- LOG.error(getSelfId() + ": Failed to append " + ServerProtoUtils.toLogEntryString(entry), throwable);
+ LOG.error("{}: Failed to append {}", getName(), ServerProtoUtils.toLogEntryString(entry), throwable);
throw throwable;
}
}
@@ -351,18 +412,6 @@
}
}
- private void failClientRequest(TermIndex ti) {
- if (!server.isPresent()) {
- return;
- }
- try {
- final LogEntryProto entry = get(ti.getIndex());
- server.get().failClientRequest(entry);
- } catch(RaftLogIOException e) {
- LOG.error(getName() + ": Failed to read log " + ti, e);
- }
- }
-
@Override
public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
checkLogState();
@@ -370,7 +419,7 @@
return Collections.emptyList();
}
try(AutoCloseableLock writeLock = writeLock()) {
- final TruncateIndices ti = cache.computeTruncateIndices(this::failClientRequest, entries);
+ final TruncateIndices ti = cache.computeTruncateIndices(server::notifyTruncatedLogEntry, entries);
final long truncateIndex = ti.getTruncateIndex();
final int index = ti.getArrayIndex();
LOG.debug("truncateIndex={}, arrayIndex={}", truncateIndex, index);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index cbc9555..58f5093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -19,11 +19,11 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
@@ -39,8 +39,6 @@
import java.util.*;
import java.util.function.Consumer;
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
/**
* In-memory RaftLog Cache. Currently we provide a simple implementation that
* caches all the segments in the memory. The cache is not thread-safe and
@@ -296,8 +294,8 @@
private final int maxCachedSegments;
private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
- SegmentedRaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties properties) {
- this.name = selfId + "-" + getClass().getSimpleName();
+ SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties) {
+ this.name = name + "-" + getClass().getSimpleName();
this.closedSegments = new LogSegmentList(name);
this.storage = storage;
maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
@@ -447,7 +445,7 @@
long getEndIndex() {
return openSegment != null ? openSegment.getEndIndex() :
(closedSegments.isEmpty() ?
- INVALID_LOG_INDEX :
+ RaftLog.INVALID_LOG_INDEX:
closedSegments.get(closedSegments.size() - 1).getEndIndex());
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 5cfeb89..ec7ab73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -23,8 +23,7 @@
import com.codahale.metrics.Timer;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -167,14 +166,14 @@
private final StateMachineDataPolicy stateMachineDataPolicy;
- SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
+ SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftServerImpl server, RaftStorage storage, RaftProperties properties) {
- this.name = selfId + "-" + getClass().getSimpleName() + ":" + storage.getStorageDir();
+ this.name = memberId + "-" + getClass().getSimpleName();
LOG.info("new {} for {}", name, storage);
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
this.stateMachine = stateMachine;
- this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
+ this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
this.storage = storage;
this.server = server;
final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 9cfb436..6bbd708 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -246,7 +246,7 @@
final List<RaftPeerId> ids = new ArrayList<>();
final RaftServerImpl leader = cluster.getLeader();
final RaftLog leaderLog = leader.getState().getLog();
- final RaftPeerId leaderId = leaderLog.getSelfId();
+ final RaftPeerId leaderId = leader.getId();
ids.add(leaderId);
RaftTestUtil.getStateMachineLogEntries(leaderLog);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 8595246..a249fe0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -22,12 +22,15 @@
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.LogSegmentList;
import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog.SegmentRange;
@@ -153,6 +156,8 @@
RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB"));
final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+ final RaftGroupId groupId = RaftGroupId.randomId();
+ final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop);
File storageDir = getTestDir();
@@ -165,8 +170,8 @@
Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{});
Mockito.when(state.getLastAppliedIndex()).thenReturn(0L);
- SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop);
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, prop);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0);
LogEntryProto[] entries = generateEntries(slist);
raftLog.append(entries).forEach(CompletableFuture::join);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b209da9..b03102c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -75,6 +75,8 @@
}
private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+ private static final RaftGroupId groupId = RaftGroupId.randomId();
+ private static final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
static class SegmentRange {
final long start;
@@ -97,6 +99,10 @@
private long preallocatedSize;
private int bufferSize;
+ SegmentedRaftLog newSegmentedRaftLog() {
+ return new SegmentedRaftLog(memberId, null, storage, -1, properties);
+ }
+
@Before
public void setup() throws Exception {
storageDir = getTestDir();
@@ -163,8 +169,7 @@
LogEntryProto[] entries = prepareLog(ranges);
// create RaftLog object and load log file
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if log entries are loaded correctly
for (LogEntryProto e : entries) {
@@ -219,22 +224,19 @@
List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
}
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
TermIndex lastTermIndex = raftLog.getLastEntryTermIndex();
IllegalStateException ex = null;
@@ -272,15 +274,13 @@
List<LogEntryProto> entries = prepareLogEntries(ranges,
() -> new String(content));
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
@@ -294,8 +294,7 @@
List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
@@ -308,8 +307,7 @@
private void testTruncate(List<LogEntryProto> entries, long fromIndex)
throws Exception {
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// truncate the log
raftLog.truncate(fromIndex).join();
@@ -318,8 +316,7 @@
checkEntries(raftLog, entries, 0, (int) fromIndex);
}
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
if (fromIndex > 0) {
@@ -403,7 +400,7 @@
final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, p)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, storage, -1, p)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
@@ -427,9 +424,8 @@
when(server.getRetryCache()).thenReturn(retryCache);
final RaftGroupMemberId id = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
when(server.getMemberId()).thenReturn(id);
- doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+ doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
// append entries to the raftlog
@@ -444,8 +440,7 @@
List<LogEntryProto> newEntries = prepareLogEntries(
Arrays.asList(r1, r2, r3), null);
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
LOG.info("newEntries[0] = {}", newEntries.get(0));
final int last = newEntries.size() - 1;
@@ -462,8 +457,7 @@
}
// load the raftlog again and check
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
@@ -483,7 +477,7 @@
final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>());
final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
int next = 0;
@@ -547,7 +541,7 @@
RaftServerImpl server = mock(RaftServerImpl.class);
doNothing().when(server).shutdown(false);
Throwable ex = null; // TimeoutIOException
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, sm, null, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);