blob: 5be3c36578fae3fa0fabeab3de8a2c5fe6667216 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog.segmented;
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.server.raftlog.segmented.LogSegment.Op;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.ReferenceCountedObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestSegmentedRaftLogCache {
private static final RaftProperties prop = new RaftProperties();
private SegmentedRaftLogCache cache;
private SegmentedRaftLogMetrics raftLogMetrics;
private RatisMetricRegistryImpl ratisMetricRegistry;
@Before
public void setup() {
raftLogMetrics = new SegmentedRaftLogMetrics(RaftServerTestUtil.TEST_MEMBER_ID);
ratisMetricRegistry = (RatisMetricRegistryImpl) raftLogMetrics.getRegistry();
cache = new SegmentedRaftLogCache(null, null, prop, raftLogMetrics);
}
@After
public void clear() {
raftLogMetrics.unregister();
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
LogSegment s = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
s.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry));
}
if (!isOpen) {
s.close();
}
return s;
}
private void checkCache(long start, long end, int segmentSize) {
Assert.assertEquals(start, cache.getStartIndex());
Assert.assertEquals(end, cache.getEndIndex());
for (long index = start; index <= end; index++) {
final LogSegment segment = cache.getSegment(index);
final LogRecord record = segment.getLogRecord(index);
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
Assert.assertEquals(index, entry.getIndex());
}
long[] offsets = new long[]{start, start + 1, start + (end - start) / 2,
end - 1, end};
for (long offset : offsets) {
checkCacheEntries(offset, (int) (end - offset + 1), end);
checkCacheEntries(offset, 1, end);
checkCacheEntries(offset, 20, end);
checkCacheEntries(offset, segmentSize, end);
checkCacheEntries(offset, segmentSize - 1, end);
}
}
private void checkCacheEntries(long offset, int size, long end) {
final LogEntryHeader[] entries = cache.getTermIndices(offset, offset + size);
long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
Assert.assertEquals(realEnd - offset, entries.length);
for (long i = offset; i < realEnd; i++) {
Assert.assertEquals(i, entries[(int) (i - offset)].getIndex());
}
}
@Test
public void testAddSegments() throws Exception {
LogSegment s1 = prepareLogSegment(1, 100, false);
cache.addSegment(s1);
checkCache(1, 100, 100);
try {
LogSegment s = prepareLogSegment(102, 103, true);
cache.addSegment(s);
Assert.fail("should fail since there is gap between two segments");
} catch (IllegalStateException ignored) {
}
LogSegment s2 = prepareLogSegment(101, 200, true);
cache.addSegment(s2);
checkCache(1, 200, 100);
try {
LogSegment s = prepareLogSegment(201, 202, true);
cache.addSegment(s);
Assert.fail("should fail since there is still an open segment in cache");
} catch (IllegalStateException ignored) {
}
cache.rollOpenSegment(false);
checkCache(1, 200, 100);
try {
LogSegment s = prepareLogSegment(202, 203, true);
cache.addSegment(s);
Assert.fail("should fail since there is gap between two segments");
} catch (IllegalStateException ignored) {
}
LogSegment s3 = prepareLogSegment(201, 300, true);
cache.addSegment(s3);
Assert.assertNotNull(cache.getOpenSegment());
checkCache(1, 300, 100);
cache.rollOpenSegment(true);
Assert.assertNotNull(cache.getOpenSegment());
checkCache(1, 300, 100);
}
@Test
public void testAppendEntry() {
LogSegment closedSegment = prepareLogSegment(0, 99, false);
cache.addSegment(closedSegment);
final SimpleOperation m = new SimpleOperation("m");
try {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0);
cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)
);
Assert.fail("the open segment is null");
} catch (IllegalStateException ignored) {
}
LogSegment openSegment = prepareLogSegment(100, 100, true);
cache.addSegment(openSegment);
for (long index = 101; index < 200; index++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index);
cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)
);
}
Assert.assertNotNull(cache.getOpenSegment());
checkCache(0, 199, 100);
}
@Test
public void testTruncate() throws Exception {
long start = 0;
for (int i = 0; i < 5; i++) { // 5 closed segments
LogSegment s = prepareLogSegment(start, start + 99, false);
cache.addSegment(s);
start += 100;
}
// add another open segment
LogSegment s = prepareLogSegment(start, start + 99, true);
cache.addSegment(s);
long end = cache.getEndIndex();
Assert.assertEquals(599, end);
int numOfSegments = 6;
// start truncation
for (int i = 0; i < 10; i++) { // truncate 10 times
// each time truncate 37 entries
end -= 37;
TruncationSegments ts = cache.truncate(end + 1);
checkCache(0, end, 100);
// check TruncationSegments
int currentNum= (int) (end / 100 + 1);
if (currentNum < numOfSegments) {
Assert.assertEquals(1, ts.getToDelete().length);
numOfSegments = currentNum;
} else {
Assert.assertEquals(0, ts.getToDelete().length);
}
}
// 230 entries remaining. truncate at the segment boundary
TruncationSegments ts = cache.truncate(200);
checkCache(0, 199, 100);
Assert.assertEquals(1, ts.getToDelete().length);
Assert.assertEquals(200, ts.getToDelete()[0].getStartIndex());
Assert.assertEquals(229, ts.getToDelete()[0].getEndIndex());
Assert.assertEquals(0, ts.getToDelete()[0].getTargetLength());
Assert.assertFalse(ts.getToDelete()[0].isOpen());
Assert.assertNull(ts.getToTruncate());
// add another open segment and truncate it as a whole
LogSegment newOpen = prepareLogSegment(200, 249, true);
cache.addSegment(newOpen);
ts = cache.truncate(200);
checkCache(0, 199, 100);
Assert.assertEquals(1, ts.getToDelete().length);
Assert.assertEquals(200, ts.getToDelete()[0].getStartIndex());
Assert.assertEquals(249, ts.getToDelete()[0].getEndIndex());
Assert.assertEquals(0, ts.getToDelete()[0].getTargetLength());
Assert.assertTrue(ts.getToDelete()[0].isOpen());
Assert.assertNull(ts.getToTruncate());
// add another open segment and truncate part of it
newOpen = prepareLogSegment(200, 249, true);
cache.addSegment(newOpen);
ts = cache.truncate(220);
checkCache(0, 219, 100);
Assert.assertNull(cache.getOpenSegment());
Assert.assertEquals(0, ts.getToDelete().length);
Assert.assertTrue(ts.getToTruncate().isOpen());
Assert.assertEquals(219, ts.getToTruncate().getNewEndIndex());
Assert.assertEquals(200, ts.getToTruncate().getStartIndex());
Assert.assertEquals(249, ts.getToTruncate().getEndIndex());
}
@Test
public void testOpenSegmentPurge() {
int start = 0;
int end = 5;
int segmentSize = 100;
populatedSegment(start, end, segmentSize, false);
int sIndex = (end - start) * segmentSize;
populatedSegment(end, end + 1, segmentSize, true);
int purgeIndex = sIndex;
// open segment should never be purged
TruncationSegments ts = cache.purge(purgeIndex);
Assert.assertNull(ts.getToTruncate());
Assert.assertEquals(end - start, ts.getToDelete().length);
Assert.assertEquals(sIndex, cache.getStartIndex());
}
@Test
public void testCloseSegmentPurge() {
int start = 0;
int end = 5;
int segmentSize = 100;
populatedSegment(start, end, segmentSize, false);
int purgeIndex = (end - start) * segmentSize - 1;
// overlapped close segment will not purged. Passing in index - 1 since
// we purge a closed segment when end index == passed in purge index.
TruncationSegments ts = cache.purge(purgeIndex - 1);
Assert.assertNull(ts.getToTruncate());
Assert.assertEquals(end - start - 1, ts.getToDelete().length);
Assert.assertEquals(1, cache.getNumOfSegments());
}
private void populatedSegment(int start, int end, int segmentSize, boolean isOpen) {
IntStream.range(start, end).forEach(x -> {
int startIndex = x * segmentSize;
int endIndex = startIndex + segmentSize - 1;
LogSegment s = prepareLogSegment(startIndex, endIndex, isOpen);
cache.addSegment(s);
});
}
private void testIterator(long startIndex) throws IOException {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
if (prev != null) {
Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}
prev = termIndex;
}
if (startIndex <= cache.getEndIndex()) {
Assert.assertNotNull(prev);
Assert.assertEquals(cache.getEndIndex(), prev.getIndex());
}
}
@Test
public void testIterator() throws Exception {
long start = 0;
for (int i = 0; i < 2; i++) { // 2 closed segments
LogSegment s = prepareLogSegment(start, start + 99, false);
cache.addSegment(s);
start += 100;
}
// add another open segment
LogSegment s = prepareLogSegment(start, start + 99, true);
cache.addSegment(s);
for (long startIndex = 0; startIndex < 300; startIndex += 50) {
testIterator(startIndex);
}
testIterator(299);
Iterator<TermIndex> iterator = cache.iterator(300);
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testCacheMetric() {
cache.addSegment(prepareLogSegment(0, 99, false));
cache.addSegment(prepareLogSegment(100, 200, false));
cache.addSegment(prepareLogSegment(201, 300, true));
Long closedSegmentsNum = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM)).values().iterator().next().getValue();
Assert.assertEquals(2L, closedSegmentsNum.longValue());
Long closedSegmentsSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES)).values().iterator().next().getValue();
Assert.assertEquals(closedSegmentsSizeInBytes.longValue(), cache.getClosedSegmentsSizeInBytes());
Long openSegmentSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES)).values().iterator().next().getValue();
Assert.assertEquals(openSegmentSizeInBytes.longValue(), cache.getOpenSegmentSizeInBytes());
}
}