RATIS-609. Change RaftLog to use RaftGroupMemberId.
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 2641b4a..15545e1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -19,11 +19,6 @@
 
 public class StateMachineException extends RaftException {
   public StateMachineException(RaftGroupMemberId serverId, Throwable cause) {
-    this(serverId.getPeerId(), cause);
-  }
-
-  // TODO: remove this constructor in RATIS-609
-  public StateMachineException(RaftPeerId serverId, Throwable cause) {
     // cause.getMessage is added to this exception message as the exception received through
     // RPC call contains similar message but Simulated RPC doesn't. Adding the message
     // from cause to this exception makes it consistent across simulated and other RPC implementations.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6b3b626..35daf5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1281,7 +1281,13 @@
     return null;
   }
 
-  public void failClientRequest(LogEntryProto logEntry) {
+  /**
+   * The given log entry is being truncated.
+   * Fail the corresponding client request, if there is any.
+   *
+   * @param logEntry the log entry being truncated
+   */
+  public void notifyTruncatedLogEntry(LogEntryProto logEntry) {
     if (logEntry.hasStateMachineLogEntry()) {
       final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry();
       final ClientId clientId = ClientId.valueOf(smLog.getClientId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 96e0d83..05cda84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -117,7 +117,7 @@
 
     // we cannot apply log entries to the state machine in this step, since we
     // do not know whether the local log entries have been committed.
-    log = initLog(id, prop, lastApplied, this::setRaftConf);
+    this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, lastApplied, prop);
 
     RaftLog.Metadata metadata = log.loadMetadata();
     currentTerm.set(metadata.getTerm());
@@ -179,18 +179,13 @@
     stateMachineUpdater.start();
   }
 
-  /**
-   * note we do not apply log entries to the state machine here since we do not
-   * know whether they have been committed.
-   */
-  private RaftLog initLog(RaftPeerId id, RaftProperties prop,
-      long lastIndexInSnapshot, Consumer<LogEntryProto> logConsumer)
-      throws IOException {
+  private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage,
+      Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot, RaftProperties prop) throws IOException {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
-      log = new MemoryRaftLog(id, lastIndexInSnapshot, prop);
+      log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
     } else {
-      log = new SegmentedRaftLog(id, server, this.storage, lastIndexInSnapshot, prop);
+      log = new SegmentedRaftLog(memberId, server, storage, lastIndexInSnapshot, prop);
     }
     log.open(lastIndexInSnapshot, logConsumer);
     return log;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 01474fd..13a05c5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -19,12 +19,11 @@
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftConfiguration;
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -59,13 +58,14 @@
   public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
   public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync";
 
-  private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getSelfId(), s);
-  private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getSelfId(), s);
+  private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
+  private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
 
   /** The least valid log index, i.e. the index used when writing to an empty log. */
   public static final long LEAST_VALID_LOG_INDEX = 0L;
   public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
 
+  private final String name;
   /**
    * The largest committed index. Note the last committed log may be included
    * in the latest snapshot file.
@@ -74,7 +74,7 @@
   private final RaftLogIndex purgeIndex;
   private final int purgeGap;
 
-  private final RaftPeerId selfId;
+  private final RaftGroupMemberId memberId;
   private final int maxBufferSize;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -83,8 +83,9 @@
 
   private volatile LogEntryProto lastMetadataEntry = null;
 
-  protected RaftLog(RaftPeerId selfId, long commitIndex, RaftProperties properties) {
-    this.selfId = selfId;
+  protected RaftLog(RaftGroupMemberId memberId, long commitIndex, RaftProperties properties) {
+    this.name = memberId + "-" + getClass().getSimpleName();
+    this.memberId = memberId;
     this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
     this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
     this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
@@ -168,7 +169,7 @@
       try {
         operation = operation.preAppendTransaction();
       } catch (IOException e) {
-        throw new StateMachineException(selfId, e);
+        throw new StateMachineException(memberId, e);
       }
 
       // build the log entry after calling the StateMachine
@@ -176,9 +177,8 @@
 
       int entrySize = e.getSerializedSize();
       if (entrySize > maxBufferSize) {
-        throw new StateMachineException(selfId, new RaftLogIOException(
-            "Log entry size " + entrySize + " exceeds the max buffer limit of "
-                + maxBufferSize));
+        throw new StateMachineException(memberId, new RaftLogIOException(
+            "Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize));
       }
       appendEntry(e);
       return nextIndex;
@@ -193,7 +193,7 @@
   private long appendMetadataImpl(long term, long newCommitIndex) {
     checkLogState();
     if (!shouldAppendMetadata(newCommitIndex)) {
-      return RaftServerConstants.INVALID_LOG_INDEX;
+      return INVALID_LOG_INDEX;
     }
 
     final LogEntryProto entry;
@@ -437,12 +437,8 @@
     state.close();
   }
 
-  public RaftPeerId getSelfId() {
-    return selfId;
-  }
-
   public String getName() {
-    return selfId + "-" + getClass().getSimpleName();
+    return name;
   }
 
   /**
@@ -477,17 +473,17 @@
       } catch (TimeoutException t) {
         throw t;
       } catch (Throwable t) {
-        final String err = selfId + ": Failed readStateMachineData for " +
+        final String err = getName() + ": Failed readStateMachineData for " +
             ServerProtoUtils.toLogEntryString(logEntry);
-        LogAppender.LOG.error(err, t);
+        LOG.error(err, t);
         throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t));
       }
       // by this time we have already read the state machine data,
       // so the log entry data should be set now
       if (ServerProtoUtils.shouldReadStateMachineData(entryProto)) {
-        final String err = selfId + ": State machine data not set for " +
+        final String err = getName() + ": State machine data not set for " +
             ServerProtoUtils.toLogEntryString(logEntry);
-        LogAppender.LOG.error(err);
+        LOG.error(err);
         throw new RaftLogIOException(err);
       }
       return entryProto;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index f07aa1a..1799fae 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -18,8 +18,8 @@
 package org.apache.ratis.server.raftlog.memory;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -72,8 +72,8 @@
   private final EntryList entries = new EntryList();
   private final AtomicReference<Metadata> metadata = new AtomicReference<>(new Metadata(null, 0));
 
-  public MemoryRaftLog(RaftPeerId selfId, long commitIndex, RaftProperties properties) {
-    super(selfId, commitIndex, properties);
+  public MemoryRaftLog(RaftGroupMemberId memberId, long commitIndex, RaftProperties properties) {
+    super(memberId, commitIndex, properties);
   }
 
   @Override
@@ -154,7 +154,7 @@
 
   @Override
   public long getStartIndex() {
-    return entries.size() == 0? RaftServerConstants.INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex();
+    return entries.size() == 0? INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex();
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 96330d6..fbe8529 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -31,6 +32,7 @@
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -110,29 +112,89 @@
     }
   }
 
-  private final Optional<RaftServerImpl> server;
+  /** The methods defined in {@link RaftServerImpl} which are used in {@link SegmentedRaftLog}. */
+  interface ServerLogMethods {
+    ServerLogMethods DUMMY = new ServerLogMethods() {};
+
+    default boolean shouldEvictCache() {
+      return false;
+    }
+
+    default long[] getFollowerNextIndices() {
+      return null;
+    }
+
+    default long getLastAppliedIndex() {
+      return INVALID_LOG_INDEX;
+    }
+
+    /** Notify the server that a log entry is being truncated. */
+    default void notifyTruncatedLogEntry(TermIndex ti) {
+    }
+  }
+
+  /**
+   * When the server is null, return the dummy instance of {@link ServerLogMethods}.
+   * Otherwise, the server is non-null, return the implementation using the given server.
+   */
+  private ServerLogMethods newServerLogMethods(RaftServerImpl impl) {
+    if (impl == null) {
+      return ServerLogMethods.DUMMY;
+    }
+
+    return new ServerLogMethods() {
+      @Override
+      public boolean shouldEvictCache() {
+        return cache.shouldEvict();
+      }
+
+      @Override
+      public long[] getFollowerNextIndices() {
+        return impl.getFollowerNextIndices();
+      }
+
+      @Override
+      public long getLastAppliedIndex() {
+        return impl.getState().getLastAppliedIndex();
+      }
+
+      @Override
+      public void notifyTruncatedLogEntry(TermIndex ti) {
+        try {
+          final LogEntryProto entry = get(ti.getIndex());
+          impl.notifyTruncatedLogEntry(entry);
+        } catch (RaftLogIOException e) {
+          LOG.error("{}: Failed to read log {}", getName(), ti, e);
+        }
+      }
+    };
+  }
+
+  private final ServerLogMethods server;
   private final RaftStorage storage;
+  private final StateMachine stateMachine;
   private final SegmentedRaftLogCache cache;
   private final SegmentedRaftLogWorker fileLogWorker;
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
 
-  public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
+  public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
-    this(selfId, server, server != null? server.getStateMachine(): null,
+    this(memberId, server, server != null? server.getStateMachine(): null,
         server != null? server::submitUpdateCommitEvent: null,
         storage, lastIndexInSnapshot, properties);
   }
 
-  SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
+  SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
       StateMachine stateMachine, Runnable submitUpdateCommitEvent,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
-    super(selfId, lastIndexInSnapshot, properties);
-    this.server = Optional.ofNullable(server);
+    super(memberId, lastIndexInSnapshot, properties);
+    this.server = newServerLogMethods(server);
     this.storage = storage;
+    this.stateMachine = stateMachine;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.cache = new SegmentedRaftLogCache(selfId, storage, properties);
-    this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
+    this.cache = new SegmentedRaftLogCache(memberId, storage, properties);
+    this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
         submitUpdateCommitEvent, server, storage, properties);
     stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
   }
@@ -221,9 +283,10 @@
     }
 
     try {
-      return new EntryWithData(entry, server.map(s -> s.getStateMachine().readStateMachineData(entry)).orElse(null));
+      final CompletableFuture<ByteString> future = stateMachine != null? stateMachine.readStateMachineData(entry): null;
+      return new EntryWithData(entry, future);
     } catch (Throwable e) {
-      final String err = getSelfId() + ": Failed readStateMachineData for " +
+      final String err = getName() + ": Failed readStateMachineData for " +
           ServerProtoUtils.toLogEntryString(entry);
       LOG.error(err, e);
       throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
@@ -231,12 +294,11 @@
   }
 
   private void checkAndEvictCache() {
-    if (server.isPresent() && cache.shouldEvict()) {
+    if (server.shouldEvictCache()) {
       // TODO if the cache is hitting the maximum size and we cannot evict any
       // segment's cache, should block the new entry appending or new segment
       // allocation.
-      final RaftServerImpl s = server.get();
-      cache.evictCache(s.getFollowerNextIndices(), fileLogWorker.getFlushIndex(), s.getState().getLastAppliedIndex());
+      cache.evictCache(server.getFollowerNextIndices(), fileLogWorker.getFlushIndex(), server.getLastAppliedIndex());
     }
   }
 
@@ -296,8 +358,7 @@
   protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
     checkLogState();
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: appendEntry {}", getSelfId(),
-          ServerProtoUtils.toLogEntryString(entry));
+      LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
     }
     try(AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
@@ -334,7 +395,7 @@
       }
       return writeFuture;
     } catch (Throwable throwable) {
-      LOG.error(getSelfId() + ": Failed to append " + ServerProtoUtils.toLogEntryString(entry), throwable);
+      LOG.error("{}: Failed to append {}", getName(), ServerProtoUtils.toLogEntryString(entry), throwable);
       throw throwable;
     }
   }
@@ -351,18 +412,6 @@
     }
   }
 
-  private void failClientRequest(TermIndex ti) {
-    if (!server.isPresent()) {
-      return;
-    }
-    try {
-      final LogEntryProto entry = get(ti.getIndex());
-      server.get().failClientRequest(entry);
-    } catch(RaftLogIOException e) {
-      LOG.error(getName() + ": Failed to read log " + ti, e);
-    }
-  }
-
   @Override
   public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
     checkLogState();
@@ -370,7 +419,7 @@
       return Collections.emptyList();
     }
     try(AutoCloseableLock writeLock = writeLock()) {
-      final TruncateIndices ti = cache.computeTruncateIndices(this::failClientRequest, entries);
+      final TruncateIndices ti = cache.computeTruncateIndices(server::notifyTruncatedLogEntry, entries);
       final long truncateIndex = ti.getTruncateIndex();
       final int index = ti.getArrayIndex();
       LOG.debug("truncateIndex={}, arrayIndex={}", truncateIndex, index);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index cbc9555..58f5093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -19,11 +19,11 @@
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
@@ -39,8 +39,6 @@
 import java.util.*;
 import java.util.function.Consumer;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
 /**
  * In-memory RaftLog Cache. Currently we provide a simple implementation that
  * caches all the segments in the memory. The cache is not thread-safe and
@@ -296,8 +294,8 @@
   private final int maxCachedSegments;
   private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
 
-  SegmentedRaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties properties) {
-    this.name = selfId + "-" + getClass().getSimpleName();
+  SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties) {
+    this.name = name + "-" + getClass().getSimpleName();
     this.closedSegments = new LogSegmentList(name);
     this.storage = storage;
     maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
@@ -447,7 +445,7 @@
   long getEndIndex() {
     return openSegment != null ? openSegment.getEndIndex() :
         (closedSegments.isEmpty() ?
-            INVALID_LOG_INDEX :
+            RaftLog.INVALID_LOG_INDEX:
             closedSegments.get(closedSegments.size() - 1).getEndIndex());
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 5cfeb89..ec7ab73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -23,8 +23,7 @@
 import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -167,14 +166,14 @@
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
-  SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
+  SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
                          RaftServerImpl server, RaftStorage storage, RaftProperties properties) {
-    this.name = selfId + "-" + getClass().getSimpleName() + ":" + storage.getStorageDir();
+    this.name = memberId + "-" + getClass().getSimpleName();
     LOG.info("new {} for {}", name, storage);
 
     this.submitUpdateCommitEvent = submitUpdateCommitEvent;
     this.stateMachine = stateMachine;
-    this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
+    this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
     this.storage = storage;
     this.server = server;
     final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 9cfb436..6bbd708 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -246,7 +246,7 @@
     final List<RaftPeerId> ids = new ArrayList<>();
     final RaftServerImpl leader = cluster.getLeader();
     final RaftLog leaderLog = leader.getState().getLog();
-    final RaftPeerId leaderId = leaderLog.getSelfId();
+    final RaftPeerId leaderId = leader.getId();
     ids.add(leaderId);
 
     RaftTestUtil.getStateMachineLogEntries(leaderLog);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 8595246..a249fe0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -22,12 +22,15 @@
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.LogSegmentList;
 import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog.SegmentRange;
@@ -153,6 +156,8 @@
     RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
     RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB"));
     final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+    final RaftGroupId groupId = RaftGroupId.randomId();
+    final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
     final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop);
 
     File storageDir = getTestDir();
@@ -165,8 +170,8 @@
     Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{});
     Mockito.when(state.getLastAppliedIndex()).thenReturn(0L);
 
-    SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop);
-    raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+    SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, prop);
+    raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
     List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0);
     LogEntryProto[] entries = generateEntries(slist);
     raftLog.append(entries).forEach(CompletableFuture::join);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b209da9..b03102c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -75,6 +75,8 @@
   }
 
   private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+  private static final RaftGroupId groupId = RaftGroupId.randomId();
+  private static final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
 
   static class SegmentRange {
     final long start;
@@ -97,6 +99,10 @@
   private long preallocatedSize;
   private int bufferSize;
 
+  SegmentedRaftLog newSegmentedRaftLog() {
+    return new SegmentedRaftLog(memberId, null, storage, -1, properties);
+  }
+
   @Before
   public void setup() throws Exception {
     storageDir = getTestDir();
@@ -163,8 +169,7 @@
     LogEntryProto[] entries = prepareLog(ranges);
 
     // create RaftLog object and load log file
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if log entries are loaded correctly
       for (LogEntryProto e : entries) {
@@ -219,22 +224,19 @@
     List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
     List<LogEntryProto> entries = prepareLogEntries(ranges, null);
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
     }
 
-    try (SegmentedRaftLog raftLog =
-        new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       TermIndex lastTermIndex  = raftLog.getLastEntryTermIndex();
       IllegalStateException ex = null;
@@ -272,15 +274,13 @@
     List<LogEntryProto> entries = prepareLogEntries(ranges,
         () -> new String(content));
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
@@ -294,8 +294,7 @@
     List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
     List<LogEntryProto> entries = prepareLogEntries(ranges, null);
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
@@ -308,8 +307,7 @@
 
   private void testTruncate(List<LogEntryProto> entries, long fromIndex)
       throws Exception {
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // truncate the log
       raftLog.truncate(fromIndex).join();
@@ -318,8 +316,7 @@
       checkEntries(raftLog, entries, 0, (int) fromIndex);
     }
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       if (fromIndex > 0) {
@@ -403,7 +400,7 @@
 
     final RaftProperties p = new RaftProperties();
     RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, p)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, storage, -1, p)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
       final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
@@ -427,9 +424,8 @@
     when(server.getRetryCache()).thenReturn(retryCache);
     final RaftGroupMemberId id = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
     when(server.getMemberId()).thenReturn(id);
-    doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+    doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
       // append entries to the raftlog
@@ -444,8 +440,7 @@
     List<LogEntryProto> newEntries = prepareLogEntries(
         Arrays.asList(r1, r2, r3), null);
 
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       LOG.info("newEntries[0] = {}", newEntries.get(0));
       final int last = newEntries.size() - 1;
@@ -462,8 +457,7 @@
     }
 
     // load the raftlog again and check
-    try (SegmentedRaftLog raftLog =
-             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
@@ -483,7 +477,7 @@
     final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>());
 
     final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
 
       int next = 0;
@@ -547,7 +541,7 @@
     RaftServerImpl server = mock(RaftServerImpl.class);
     doNothing().when(server).shutdown(false);
     Throwable ex = null; // TimeoutIOException
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, sm, null, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // SegmentedRaftLogWorker should catch TimeoutIOException
       CompletableFuture<Long> f = raftLog.appendEntry(entry);