blob: 4370687844c3285a34287dd8ddc9713111da1ed9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.logsegment;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.*;
/**
* Test Case for Per Stream Log Segment Cache.
*/
public class TestPerStreamLogSegmentCache {
@Test(timeout = 60000)
public void testBasicOperations() {
LogSegmentMetadata metadata =
DLMTestUtil.completedLogSegment("/segment1", 1L, 1L, 100L, 100, 1L, 99L, 0L);
String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L);
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-basic-operations");
assertNull("No log segment " + name + " should be cached", cache.get(name));
cache.add(name, metadata);
LogSegmentMetadata metadataRetrieved = cache.get(name);
assertNotNull("log segment " + name + " should be cached", metadataRetrieved);
assertEquals("Wrong log segment metadata returned for " + name,
metadata, metadataRetrieved);
LogSegmentMetadata metadataRemoved = cache.remove(name);
assertNull("log segment " + name + " should be removed from cache", cache.get(name));
assertEquals("Wrong log segment metadata removed for " + name,
metadata, metadataRemoved);
assertNull("No log segment " + name + " to be removed", cache.remove(name));
}
@Test(timeout = 60000)
public void testDiff() {
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-diff");
// add 5 completed log segments
for (int i = 1; i <= 5; i++) {
LogSegmentMetadata metadata =
DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L);
String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
cache.add(name, metadata);
}
// add one inprogress log segment
LogSegmentMetadata inprogress =
DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
String name = DLMTestUtil.inprogressZNodeName(6);
cache.add(name, inprogress);
// deleted first 2 completed log segments and completed the last one
Set<String> segmentRemoved = Sets.newHashSet();
for (int i = 1; i <= 2; i++) {
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
}
segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
Set<String> segmentReceived = Sets.newHashSet();
Set<String> segmentAdded = Sets.newHashSet();
for (int i = 3; i <= 6; i++) {
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
if (i == 6) {
segmentAdded.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
}
}
Pair<Set<String>, Set<String>> segmentChanges = cache.diff(segmentReceived);
assertTrue("Should remove " + segmentRemoved + ", but removed " + segmentChanges.getRight(),
Sets.difference(segmentRemoved, segmentChanges.getRight()).isEmpty());
assertTrue("Should add " + segmentAdded + ", but added " + segmentChanges.getLeft(),
Sets.difference(segmentAdded, segmentChanges.getLeft()).isEmpty());
}
@Test(timeout = 60000)
public void testUpdate() {
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-update");
// add 5 completed log segments
for (int i = 1; i <= 5; i++) {
LogSegmentMetadata metadata =
DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L);
String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
cache.add(name, metadata);
}
// add one inprogress log segment
LogSegmentMetadata inprogress =
DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
String name = DLMTestUtil.inprogressZNodeName(6);
cache.add(name, inprogress);
// deleted first 2 completed log segments and completed the last one
Set<String> segmentRemoved = Sets.newHashSet();
for (int i = 1; i <= 2; i++) {
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
}
segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
Set<String> segmentReceived = Sets.newHashSet();
Map<String, LogSegmentMetadata> segmentAdded = Maps.newHashMap();
for (int i = 3; i <= 6; i++) {
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
if (i == 6) {
segmentAdded.put(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i),
DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L));
}
}
// update the cache
cache.update(segmentRemoved, segmentAdded);
for (String segment : segmentRemoved) {
assertNull("Segment " + segment + " should be removed.", cache.get(segment));
}
for (String segment : segmentReceived) {
assertNotNull("Segment " + segment + " should not be removed", cache.get(segment));
}
for (Map.Entry<String, LogSegmentMetadata> entry : segmentAdded.entrySet()) {
assertEquals("Segment " + entry.getKey() + " should be added.",
entry.getValue(), entry.getValue());
}
}
@Test(timeout = 60000, expected = UnexpectedException.class)
public void testGapDetection() throws Exception {
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-gap-detection");
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L),
DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L));
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L),
DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L));
cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
}
@Test(timeout = 60000)
public void testGapDetectionOnLogSegmentsWithoutLogSegmentSequenceNumber() throws Exception {
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-gap-detection");
LogSegmentMetadata segment1 =
DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L)
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V1_ORIGINAL).build();
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), segment1);
LogSegmentMetadata segment3 =
DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L)
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO).build();
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), segment3);
List<LogSegmentMetadata> expectedList = Lists.asList(segment1, new LogSegmentMetadata[] { segment3 });
List<LogSegmentMetadata> resultList = cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
assertEquals(expectedList, resultList);
}
@Test(timeout = 60000, expected = UnexpectedException.class)
public void testSameLogSegment() throws Exception {
PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-same-log-segment");
List<LogSegmentMetadata> expectedList = Lists.newArrayListWithExpectedSize(2);
LogSegmentMetadata inprogress =
DLMTestUtil.inprogressLogSegment("/inprogress-1", 1L, 1L, 1L);
expectedList.add(inprogress);
cache.add(DLMTestUtil.inprogressZNodeName(1L), inprogress);
LogSegmentMetadata completed =
DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L);
expectedList.add(completed);
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), completed);
cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
}
}