RATIS-2056. Update purge index when snapshot installed. (#1065)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 9e07956..a41d90b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -142,6 +142,14 @@
return false;
}
+ protected void updatePurgeIndex(Long purged) {
+ try (AutoCloseableLock writeLock = writeLock()) {
+ if (purged != null) {
+ purgeIndex.updateToMax(purged, infoIndexChange);
+ }
+ }
+ }
+
protected void updateSnapshotIndexFromStateMachine() {
updateSnapshotIndex(getSnapshotIndexFromStateMachine.getAsLong());
}
@@ -340,9 +348,7 @@
LOG.info("{}: purge {}", getName(), suggestedIndex);
final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
- if (purged != null) {
- purgeIndex.updateToMax(purged, infoIndexChange);
- }
+ updatePurgeIndex(purged);
if (e != null) {
LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 8d972a1..7f9ef30 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -272,7 +272,7 @@
if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
LOG.warn("End log index {} is smaller than last index in snapshot {}",
cache.getEndIndex(), lastIndexInSnapshot);
- purgeImpl(lastIndexInSnapshot);
+ purgeImpl(lastIndexInSnapshot).whenComplete((purged, e) -> updatePurgeIndex(purged));
}
}
}
@@ -557,7 +557,7 @@
cacheEviction.signal();
}
}
- return purgeImpl(lastSnapshotIndex);
+ return purgeImpl(lastSnapshotIndex).whenComplete((purged, e) -> updatePurgeIndex(purged));
}
@Override