blob: fbdafb131d63cc7cf4156700d11dc4e6b768e25b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.tcm.log;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MetadataSnapshots;
import org.apache.cassandra.tcm.transformations.CustomTransformation;
import org.apache.cassandra.tcm.transformations.ForceSnapshot;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.tcm.Epoch.EMPTY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class LocalLogTest
{
@BeforeClass
public static void beforeClass()
{
DatabaseDescriptor.daemonInitialization();
}
@Test
public void appendToFillGapWithConsecutiveBufferedEntries()
{
LocalLog log = LocalLog.logSpec()
.sync()
.withInitialState(cm())
.createLog();
log.readyUnchecked();
Epoch start = log.metadata().epoch;
assertEquals(EMPTY, start);
Entry e1 = entry(1), e2 = entry(2), e3 = entry(3);
log.append(e2);
Epoch tail = log.waitForHighestConsecutive().epoch;
assertEquals(tail, start);
log.append(e3);
tail = log.waitForHighestConsecutive().epoch;
assertEquals(tail, start);
log.append(e1);
tail = log.waitForHighestConsecutive().epoch;
assertEquals(e3.epoch, tail);
}
@Test
public void forceSnapshotCollisionWithGap()
{
LocalLog log = LocalLog.logSpec()
.sync()
.withInitialState(cm())
.createLog();
log.readyUnchecked();
List<Entry> entries = new ArrayList<>();
for (int i = 1; i <= 9; i++)
entries.add(entry(i));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(11),
TriggerSnapshot.instance));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(11),
new ForceSnapshot(new ClusterMetadata(Murmur3Partitioner.instance).forceEpoch(Epoch.create(11)))));
Collections.shuffle(entries);
log.append(entries);
ClusterMetadata tail = log.waitForHighestConsecutive();
assertEquals(11, tail.epoch.getEpoch());
}
@Test
public void forceSnapshotIsNotPersisted()
{
LogStorage storage = new LogStorage()
{
@Override
public void append(Entry entry)
{
throw new RuntimeException("we should not append anything");
}
@Override
public LogState getPersistedLogState()
{
return LogState.EMPTY;
}
@Override
public LogState getLogStateBetween(ClusterMetadata base, Epoch end)
{
return LogState.EMPTY;
}
@Override
public EntryHolder getEntries(Epoch since) throws IOException
{
return new EntryHolder(since);
}
@Override
public MetadataSnapshots snapshots()
{
return MetadataSnapshots.NO_OP;
}
};
LocalLog log = LocalLog.logSpec()
.sync()
.withInitialState(cm())
.withStorage(storage)
.createLog();
log.readyUnchecked();
Entry entry = new Entry(Entry.Id.NONE,
Epoch.create(11),
new ForceSnapshot(new ClusterMetadata(new LocalPartitioner(IntegerType.instance)).forceEpoch(Epoch.create(11))));
log.append(entry);
ClusterMetadata tail = log.waitForHighestConsecutive();
assertEquals(11, tail.epoch.getEpoch());
}
@Test
public void multipleSnapshotEntries()
{
LocalLog log = LocalLog.logSpec()
.sync()
.withInitialState(cm())
.createLog();
log.readyUnchecked();
List<Entry> entries =new ArrayList<>();
for (int i = 1; i <= 9; i++)
entries.add(entry(i));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(11),
TriggerSnapshot.instance));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(11),
new ForceSnapshot(new ClusterMetadata(Murmur3Partitioner.instance).forceEpoch(Epoch.create(11)))));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(21),
new ForceSnapshot(new ClusterMetadata(Murmur3Partitioner.instance).forceEpoch(Epoch.create(21)))));
entries.add(new Entry(Entry.Id.NONE,
Epoch.create(31),
new ForceSnapshot(new ClusterMetadata(Murmur3Partitioner.instance).forceEpoch(Epoch.create(31)))));
Collections.shuffle(entries);
log.append(entries);
ClusterMetadata tail = log.waitForHighestConsecutive();
assertEquals(31, tail.epoch.getEpoch());
}
@Test
public void appendFuzzTest() throws InterruptedException
{
for (int i = 0; i < 2000; i++)
{
singleAppendFuzzTest();
}
}
public void singleAppendFuzzTest() throws InterruptedException
{
long seed = System.nanoTime();
Random random = new Random(seed);
int entryCount = random.nextInt(90) + 10;
ImmutableList.Builder<Entry> builder = ImmutableList.builderWithExpectedSize(entryCount);
for (int i = 0; i < entryCount; i++)
builder.add(entry(i + 1));
ImmutableList<Entry> entries = builder.build();
Set<Entry> submitted = ConcurrentHashMap.newKeySet();
int threads = 10;
CountDownLatch begin = CountDownLatch.newCountDownLatch(1);
CountDownLatch finish = CountDownLatch.newCountDownLatch(threads);
CountDownLatch finishReaders = CountDownLatch.newCountDownLatch(threads);
ExecutorPlus executor = executorFactory().configurePooled("APPENDER", threads * 2).build();
LocalLog log = LocalLog.logSpec().withInitialState(cm()).createLog();
List<Entry> committed = new CopyOnWriteArrayList<>(); // doesn't need to be concurrent, since log is single-threaded
log.addListener((e, m) -> committed.add(e));
for (int i = 0; i < threads; i++)
{
executor.submit(() -> {
begin.awaitUninterruptibly();
while (submitted.size() < entryCount)
{
// grab a random slice of up to 10 entries from the list and try to append them to the log
// end can be size + 1 because sublist is end-exclusive
int end = random.nextInt(entryCount + 1);
int start = Math.max(0, end - (random.nextInt(10) + 1));
List<Entry> toAppend = entries.subList(start, end);
log.append(toAppend);
submitted.addAll(toAppend);
}
finish.decrement();
});
}
for (int i = 0; i < threads; i++)
{
executor.submit(() -> {
begin.awaitUninterruptibly();
while (submitted.size() < entryCount)
{
Epoch waitFor = entries.get(random.nextInt(entries.size())).epoch;
try
{
Assert.assertTrue(log.awaitAtLeast(waitFor).epoch.isEqualOrAfter(waitFor));
}
catch (InterruptedException | TimeoutException e)
{
// ignore
}
}
finishReaders.decrement();
});
}
begin.decrement();
finish.awaitUninterruptibly();
log.waitForHighestConsecutive();
assertEquals(entries.get(entries.size() - 1).epoch, log.metadata().epoch);
if (!entries.equals(committed))
fail("Committed list didn't match expected." +
"\n\tCommitted: " + toString(committed) +
"\n\tExpected : " + toString(entries) +
"\n\tPending: " + log.pendingBufferSize() +
"\n\tSeed: " + seed);
assertEquals(0, log.pendingBufferSize());
finishReaders.awaitUninterruptibly();
executor.shutdownNow();
executor.awaitTermination(10, TimeUnit.SECONDS);
log.close();
}
public static String toString(Collection<? extends Entry> entries)
{
return entries.stream()
.map((e) -> Integer.toString(((CustomTransformation.PokeInt) ((CustomTransformation) e.transform).child()).v))
.collect(Collectors.joining(","));
}
static Entry entry(int i)
{
return entry(i, i);
}
static Entry entry(int i, long epoch)
{
return new Entry(new Entry.Id(i),
Epoch.create(epoch),
new CustomTransformation(CustomTransformation.PokeInt.NAME, new CustomTransformation.PokeInt(i)));
}
static ClusterMetadata cm()
{
return new ClusterMetadata(Murmur3Partitioner.instance);
}
}