blob: e731960d985e4eca0935773e8cbd554abb893bc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.timeline;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class SnapshottableHashTableTest {
/**
* The class of test elements.
*
* This class is intended to help test how the table handles distinct objects which
* are equal to each other. Therefore, for the purpose of hashing and equality, we
* only check i here, and ignore j.
*/
static class TestElement implements SnapshottableHashTable.ElementWithStartEpoch {
private final int i;
private final char j;
private long startEpoch = Long.MAX_VALUE;
TestElement(int i, char j) {
this.i = i;
this.j = j;
}
@Override
public void setStartEpoch(long startEpoch) {
this.startEpoch = startEpoch;
}
@Override
public long startEpoch() {
return startEpoch;
}
@Override
public int hashCode() {
return i;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TestElement other)) {
return false;
}
return other.i == i;
}
@Override
public String toString() {
return String.format("E_%d%c(%s)", i, j, System.identityHashCode(this));
}
}
private static final TestElement E_1A = new TestElement(1, 'A');
private static final TestElement E_1B = new TestElement(1, 'B');
private static final TestElement E_2A = new TestElement(2, 'A');
private static final TestElement E_3A = new TestElement(3, 'A');
private static final TestElement E_3B = new TestElement(3, 'B');
@Test
public void testEmptyTable() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
}
@Test
public void testDeleteOnEmptyDeltaTable() {
// A simple test case to validate the behavior of the TimelineHashSet
// when the deltaTable for a snapshot is null
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineHashSet<String> set = new TimelineHashSet<>(registry, 5);
registry.getOrCreateSnapshot(100);
set.add("bar");
registry.getOrCreateSnapshot(200);
set.add("baz");
// The deltatable of epoch 200 is null, it should not throw exception while reverting (deltatable merge)
registry.revertToSnapshot(100);
assertTrue(set.isEmpty());
set.add("foo");
registry.getOrCreateSnapshot(300);
// After reverting to epoch 100, "bar" is not existed anymore
set.remove("bar");
// No deltatable merging is needed because nothing change in snapshot epoch 300
registry.revertToSnapshot(100);
assertTrue(set.isEmpty());
set.add("qux");
registry.getOrCreateSnapshot(400);
assertEquals(1, set.size());
set.add("fred");
set.add("thud");
registry.getOrCreateSnapshot(500);
assertEquals(3, set.size());
// remove the value in epoch 101(after epoch 100), it'll create an entry in deltatable in the snapshot of epoch 500 for the deleted value in epoch 101
set.remove("qux");
assertEquals(2, set.size());
// When reverting to snapshot of epoch 400, we'll merge the deltatable in epoch 500 with the one in epoch 400.
// The deltatable in epoch 500 has an entry created above, but the deltatable in epoch 400 is null.
// It should not throw exception while reverting (deltatable merge)
registry.revertToSnapshot(400);
// After reverting, the deltatable in epoch 500 should merge to the current epoch
assertEquals(1, set.size());
// When reverting to epoch 100, the deltatable in epoch 400 won't be merged because the entry change is epoch 101(after epoch 100)
registry.revertToSnapshot(100);
assertTrue(set.isEmpty());
}
@Test
public void testAddAndRemove() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertNull(table.snapshottableAddOrReplace(E_1B));
assertEquals(1, table.snapshottableSize(Long.MAX_VALUE));
registry.getOrCreateSnapshot(0);
assertSame(E_1B, table.snapshottableAddOrReplace(E_1A));
assertSame(E_1B, table.snapshottableGet(E_1A, 0));
assertSame(E_1A, table.snapshottableGet(E_1A, Long.MAX_VALUE));
assertNull(table.snapshottableAddOrReplace(E_2A));
assertNull(table.snapshottableAddOrReplace(E_3A));
assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
assertEquals(1, table.snapshottableSize(0));
registry.getOrCreateSnapshot(1);
assertEquals(E_1A, table.snapshottableRemove(E_1B));
assertEquals(E_2A, table.snapshottableRemove(E_2A));
assertEquals(E_3A, table.snapshottableRemove(E_3A));
assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
assertEquals(1, table.snapshottableSize(0));
assertEquals(3, table.snapshottableSize(1));
registry.deleteSnapshot(0);
assertEquals("No in-memory snapshot for epoch 0. Snapshot epochs are: 1",
assertThrows(RuntimeException.class, () ->
table.snapshottableSize(0)).getMessage());
registry.deleteSnapshot(1);
assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
}
@Test
public void testIterateOverSnapshot() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertTrue(table.snapshottableAddUnlessPresent(E_1B));
assertFalse(table.snapshottableAddUnlessPresent(E_1A));
assertTrue(table.snapshottableAddUnlessPresent(E_2A));
assertTrue(table.snapshottableAddUnlessPresent(E_3A));
registry.getOrCreateSnapshot(0);
assertIteratorYields(table.snapshottableIterator(0), E_1B, E_2A, E_3A);
assertEquals(E_1B, table.snapshottableRemove(E_1B));
assertIteratorYields(table.snapshottableIterator(0), E_1B, E_2A, E_3A);
assertNull(table.snapshottableRemove(E_1A));
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_2A, E_3A);
assertEquals(E_2A, table.snapshottableRemove(E_2A));
assertEquals(E_3A, table.snapshottableRemove(E_3A));
assertIteratorYields(table.snapshottableIterator(0), E_1B, E_2A, E_3A);
}
@Test
public void testIterateOverSnapshotWhileExpandingTable() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertNull(table.snapshottableAddOrReplace(E_1A));
registry.getOrCreateSnapshot(0);
Iterator<TestElement> iter = table.snapshottableIterator(0);
assertTrue(table.snapshottableAddUnlessPresent(E_2A));
assertTrue(table.snapshottableAddUnlessPresent(E_3A));
assertIteratorYields(iter, E_1A);
}
@Test
public void testIterateOverSnapshotWhileDeletingAndReplacing() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertNull(table.snapshottableAddOrReplace(E_1A));
assertNull(table.snapshottableAddOrReplace(E_2A));
assertNull(table.snapshottableAddOrReplace(E_3A));
assertEquals(E_1A, table.snapshottableRemove(E_1A));
assertNull(table.snapshottableAddOrReplace(E_1B));
registry.getOrCreateSnapshot(0);
Iterator<TestElement> iter = table.snapshottableIterator(0);
List<TestElement> iterElements = new ArrayList<>();
iterElements.add(iter.next());
assertEquals(E_2A, table.snapshottableRemove(E_2A));
assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
iterElements.add(iter.next());
assertEquals(E_1B, table.snapshottableRemove(E_1B));
iterElements.add(iter.next());
assertFalse(iter.hasNext());
assertIteratorYields(iterElements.iterator(), E_1B, E_2A, E_3A);
}
@Test
public void testRevert() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertNull(table.snapshottableAddOrReplace(E_1A));
assertNull(table.snapshottableAddOrReplace(E_2A));
assertNull(table.snapshottableAddOrReplace(E_3A));
registry.getOrCreateSnapshot(0);
assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
registry.getOrCreateSnapshot(1);
assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1B, E_2A, E_3B);
table.snapshottableRemove(E_1B);
table.snapshottableRemove(E_2A);
table.snapshottableRemove(E_3B);
assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
assertEquals(3, table.snapshottableSize(0));
assertEquals(3, table.snapshottableSize(1));
registry.revertToSnapshot(0);
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A);
}
@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertNull(table.snapshottableAddOrReplace(E_1A));
assertNull(table.snapshottableAddOrReplace(E_2A));
assertNull(table.snapshottableAddOrReplace(E_3A));
registry.getOrCreateSnapshot(0);
assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
registry.getOrCreateSnapshot(1);
registry.reset();
assertEquals(Collections.emptyList(), registry.epochsList());
// Check that the table is empty
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE));
}
@Test
public void testIteratorAtOlderEpoch() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 4);
assertNull(table.snapshottableAddOrReplace(E_3B));
registry.getOrCreateSnapshot(0);
assertNull(table.snapshottableAddOrReplace(E_1A));
registry.getOrCreateSnapshot(1);
assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
registry.getOrCreateSnapshot(2);
assertEquals(E_1B, table.snapshottableRemove(E_1B));
assertIteratorYields(table.snapshottableIterator(1), E_3B, E_1A);
}
/**
* Assert that the given iterator contains the given elements, in any order.
* We compare using reference equality here, rather than object equality.
*/
private static void assertIteratorYields(Iterator<?> iter,
Object... expected) {
IdentityHashMap<Object, Boolean> remaining = new IdentityHashMap<>();
for (Object object : expected) {
remaining.put(object, true);
}
List<Object> extraObjects = new ArrayList<>();
while (iter.hasNext()) {
Object object = iter.next();
assertNotNull(object);
if (remaining.remove(object) == null) {
extraObjects.add(object);
}
}
if (!extraObjects.isEmpty() || !remaining.isEmpty()) {
throw new RuntimeException("Found extra object(s): [" + extraObjects.stream().map(Object::toString).collect(Collectors.joining(", ")) +
"] and didn't find object(s): [" + remaining.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "]");
}
}
}