blob: 387ec653ce478101f6b25c85142e9d48ac5434de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import com.google.common.collect.Lists;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.dht.Range.deoverlap;
import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE;
import static org.apache.cassandra.service.paxos.Ballot.none;
import static org.apache.cassandra.service.paxos.BallotGenerator.Global.atUnixMicros;
import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
import static org.apache.cassandra.service.paxos.Commit.latest;
import static org.apache.cassandra.service.paxos.PaxosRepairHistory.trim;
public class PaxosRepairHistoryTest
{
static final Logger logger = LoggerFactory.getLogger(PaxosRepairHistoryTest.class);
private static final AtomicInteger tableNum = new AtomicInteger();
static
{
System.setProperty("cassandra.partitioner", Murmur3Partitioner.class.getName());
DatabaseDescriptor.daemonInitialization();
assert DatabaseDescriptor.getPartitioner() instanceof Murmur3Partitioner;
}
private static final Token MIN_TOKEN = Murmur3Partitioner.instance.getMinimumToken();
private static Token t(long t)
{
return new LongToken(t);
}
private static Ballot b(int b)
{
return Ballot.atUnixMicrosWithLsb(b, 0, NONE);
}
private static Range<Token> r(Token l, Token r)
{
return new Range<>(l, r);
}
private static Range<Token> r(long l, long r)
{
return r(t(l), t(r));
}
private static Pair<Token, Ballot> pt(long t, int b)
{
return Pair.create(t(t), b(b));
}
private static Pair<Token, Ballot> pt(long t, Ballot b)
{
return Pair.create(t(t), b);
}
private static Pair<Token, Ballot> pt(Token t, int b)
{
return Pair.create(t, b(b));
}
private static PaxosRepairHistory h(Pair<Token, Ballot>... points)
{
int length = points.length + (points[points.length - 1].left == null ? 0 : 1);
Token[] tokens = new Token[length - 1];
Ballot[] ballots = new Ballot[length];
for (int i = 0 ; i < length - 1 ; ++i)
{
tokens[i] = points[i].left;
ballots[i] = points[i].right;
}
ballots[length - 1] = length == points.length ? points[length - 1].right : none();
return new PaxosRepairHistory(tokens, ballots);
}
static
{
assert t(100).equals(t(100));
assert b(111).equals(b(111));
}
private static class Builder
{
PaxosRepairHistory history = PaxosRepairHistory.EMPTY;
Builder add(Ballot ballot, Range<Token>... ranges)
{
history = PaxosRepairHistory.add(history, Lists.newArrayList(ranges), ballot);
return this;
}
Builder clear()
{
history = PaxosRepairHistory.EMPTY;
return this;
}
}
static Builder builder()
{
return new Builder();
}
private static void checkSystemTableIO(PaxosRepairHistory history)
{
Assert.assertEquals(history, PaxosRepairHistory.fromTupleBufferList(history.toTupleBufferList()));
String tableName = "test" + tableNum.getAndIncrement();
SystemKeyspace.savePaxosRepairHistory("test", tableName, history, false);
Assert.assertEquals(history, SystemKeyspace.loadPaxosRepairHistory("test", tableName));
}
@BeforeClass
public static void init() throws Exception
{
SchemaLoader.prepareServer();
}
@Test
public void testAdd()
{
Builder builder = builder();
Assert.assertEquals(h(pt(10, none()), pt(20, 5), pt(30, none()), pt(40, 5)),
builder.add(b(5), r(10, 20), r(30, 40)).history);
Assert.assertEquals(none(), builder.history.ballotForToken(t(0)));
Assert.assertEquals(none(), builder.history.ballotForToken(t(10)));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(11)));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(20)));
Assert.assertEquals(none(), builder.history.ballotForToken(t(21)));
builder.clear();
Assert.assertEquals(h(pt(10, none()), pt(20, 5), pt(30, none()), pt(40, 6)),
builder.add(b(5), r(10, 20)).add(b(6), r(30, 40)).history);
builder.clear();
Assert.assertEquals(h(pt(10, none()), pt(20, 5), pt(30, 6), pt(40, 5)),
builder.add(b(5), r(10, 40)).add(b(6), r(20, 30)).history);
builder.clear();
Assert.assertEquals(h(pt(10, none()), pt(20, 6), pt(30, 5)),
builder.add(b(6), r(10, 20)).add(b(5), r(15, 30)).history);
builder.clear();
Assert.assertEquals(h(pt(10, none()), pt(20, 5), pt(30, 6)),
builder.add(b(5), r(10, 25)).add(b(6), r(20, 30)).history);
}
@Test
public void testTrim()
{
Assert.assertEquals(h(pt(10, none()), pt(20, 5), pt(30, none()), pt(40, 5), pt(50, none()), pt(60, 5)),
trim(h(pt(0, none()), pt(70, 5)), Lists.newArrayList(r(10, 20), r(30, 40), r(50, 60))));
Assert.assertEquals(h(pt(10, none()), pt(20, 5)),
trim(h(pt(0, none()), pt(20, 5)), Lists.newArrayList(r(10, 30))));
Assert.assertEquals(h(pt(10, none()), pt(20, 5)),
trim(h(pt(10, none()), pt(30, 5)), Lists.newArrayList(r(0, 20))));
}
@Test
public void testFullRange()
{
// test full range is collapsed
Builder builder = builder();
Assert.assertEquals(h(pt(null, 5)),
builder.add(b(5), r(MIN_TOKEN, MIN_TOKEN)).history);
Assert.assertEquals(b(5), builder.history.ballotForToken(MIN_TOKEN));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(0)));
}
@Test
public void testWrapAroundRange()
{
Builder builder = builder();
Assert.assertEquals(h(pt(-100, 5), pt(100, none()), pt(null, 5)),
builder.add(b(5), r(100, -100)).history);
Assert.assertEquals(b(5), builder.history.ballotForToken(MIN_TOKEN));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(-101)));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(-100)));
Assert.assertEquals(none(), builder.history.ballotForToken(t(-99)));
Assert.assertEquals(none(), builder.history.ballotForToken(t(0)));
Assert.assertEquals(none(), builder.history.ballotForToken(t(100)));
Assert.assertEquals(b(5), builder.history.ballotForToken(t(101)));
}
private static Token[] tks(long ... tks)
{
return LongStream.of(tks).mapToObj(LongToken::new).toArray(Token[]::new);
}
private static Ballot[] uuids(String ... uuids)
{
return Stream.of(uuids).map(Ballot::fromString).toArray(Ballot[]::new);
}
@Test
public void testRegression()
{
Assert.assertEquals(none(), trim(
new PaxosRepairHistory(
tks(-9223372036854775807L, -3952873730080618203L, -1317624576693539401L, 1317624576693539401L, 6588122883467697005L),
uuids("1382954c-1dd2-11b2-8fb2-f45d70d6d6d8", "138260a4-1dd2-11b2-abb2-c13c36b179e1", "1382951a-1dd2-11b2-1dd8-b7e242b38dbe", "138294fc-1dd2-11b2-83c4-43fb3a552386", "13829510-1dd2-11b2-f353-381f2ed963fa", "1382954c-1dd2-11b2-8fb2-f45d70d6d6d8")),
Collections.singleton(new Range<>(new LongToken(-1317624576693539401L), new LongToken(1317624576693539401L))))
.ballotForToken(new LongToken(-4208619967696141037L)));
}
@Test
public void testInequality()
{
Collection<Range<Token>> ranges = Collections.singleton(new Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM));
PaxosRepairHistory a = PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, none());
PaxosRepairHistory b = PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, nextBallot(NONE));
Assert.assertNotEquals(a, b);
}
@Test
public void testRandomTrims()
{
ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors());
List<Future<?>> results = new ArrayList<>();
int count = 1000;
for (int numberOfAdditions : new int[] { 1, 10, 100 })
{
for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f })
{
for (float chanceOfMinToken : new float[] { 0.01f, 0.1f })
{
results.addAll(testRandomTrims(executor, count, numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinToken));
}
}
}
FBUtilities.waitOnFutures(results);
executor.shutdown();
}
private List<Future<?>> testRandomTrims(ExecutorService executor, int tests, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinToken)
{
return ThreadLocalRandom.current()
.longs(tests)
.mapToObj(seed -> executor.submit(() -> testRandomTrims(seed, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken)))
.collect(Collectors.toList());
}
private void testRandomTrims(long seed, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinToken)
{
Random random = new Random(seed);
logger.info("Seed {} ({}, {}, {}, {})", seed, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken);
PaxosRepairHistory history = RandomPaxosRepairHistory.build(random, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken);
// generate a random list of ranges that cover the whole ring
long[] tokens = random.longs(16).distinct().toArray();
if (random.nextBoolean())
tokens[0] = Long.MIN_VALUE;
Arrays.sort(tokens);
List<List<Range<Token>>> ranges = IntStream.range(0, tokens.length <= 3 ? 1 : 1 + random.nextInt((tokens.length - 1) / 2))
.mapToObj(ignore -> new ArrayList<Range<Token>>())
.collect(Collectors.toList());
for (int i = 1 ; i < tokens.length ; ++i)
ranges.get(random.nextInt(ranges.size())).add(new Range<>(new LongToken(tokens[i - 1]), new LongToken(tokens[i])));
ranges.get(random.nextInt(ranges.size())).add(new Range<>(new LongToken(tokens[tokens.length - 1]), new LongToken(tokens[0])));
List<PaxosRepairHistory> splits = new ArrayList<>();
for (List<Range<Token>> rs : ranges)
{
PaxosRepairHistory trimmed = PaxosRepairHistory.trim(history, rs);
splits.add(trimmed);
if (rs.isEmpty())
continue;
Range<Token> prev = rs.get(rs.size() - 1);
for (Range<Token> range : rs)
{
if (prev.right.equals(range.left))
{
Assert.assertEquals(history.ballotForToken(((LongToken)range.left).decreaseSlightly()), trimmed.ballotForToken(((LongToken)range.left).decreaseSlightly()));
Assert.assertEquals(history.ballotForToken(range.left), trimmed.ballotForToken(range.left));
}
else
{
if (!range.left.isMinimum())
Assert.assertEquals(none(), trimmed.ballotForToken(range.left));
if (!prev.right.isMinimum())
Assert.assertEquals(none(), trimmed.ballotForToken(prev.right.increaseSlightly()));
}
Assert.assertEquals(history.ballotForToken(range.left.increaseSlightly()), trimmed.ballotForToken(range.left.increaseSlightly()));
if (!range.left.increaseSlightly().equals(range.right))
Assert.assertEquals(history.ballotForToken(((LongToken)range.right).decreaseSlightly()), trimmed.ballotForToken(((LongToken)range.right).decreaseSlightly()));
if (range.right.isMinimum())
Assert.assertEquals(history.ballotForToken(new LongToken(Long.MAX_VALUE)), trimmed.ballotForToken(new LongToken(Long.MAX_VALUE)));
else
Assert.assertEquals(history.ballotForToken(range.right), trimmed.ballotForToken(range.right));
prev = range;
}
}
PaxosRepairHistory merged = PaxosRepairHistory.EMPTY;
for (PaxosRepairHistory split : splits)
merged = PaxosRepairHistory.merge(merged, split);
Assert.assertEquals(history, merged);
checkSystemTableIO(history);
}
@Test
public void testRandomAdds()
{
ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors());
List<Future<?>> results = new ArrayList<>();
int count = 1000;
for (int numberOfAdditions : new int[] { 1, 10, 100 })
{
for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f })
{
for (float chanceOfMinToken : new float[] { 0.01f, 0.1f })
{
results.addAll(testRandomAdds(executor, count, 3, numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinToken));
}
}
}
FBUtilities.waitOnFutures(results);
executor.shutdown();
}
private List<Future<?>> testRandomAdds(ExecutorService executor, int tests, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinToken)
{
return ThreadLocalRandom.current()
.longs(tests)
.mapToObj(seed -> executor.submit(() -> testRandomAdds(seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken)))
.collect(Collectors.toList());
}
private void testRandomAdds(long seed, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinToken)
{
Random random = new Random(seed);
String id = String.format("%d, %d, %d, %d, %f, %f", seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken);
logger.info(id);
List<RandomWithCanonical> merge = new ArrayList<>();
while (numberOfMerges-- > 0)
{
RandomWithCanonical build = new RandomWithCanonical();
build.addRandom(random, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken);
merge.add(build);
}
RandomWithCanonical check = new RandomWithCanonical();
for (RandomWithCanonical add : merge)
check = check.merge(add);
check.serdeser();
for (Token token : check.canonical.keySet())
{
LongToken tk = (LongToken) token;
Assert.assertEquals(id, check.ballotForToken(tk.decreaseSlightly()), check.test.ballotForToken(tk.decreaseSlightly()));
Assert.assertEquals(id, check.ballotForToken(tk), check.test.ballotForToken(token));
Assert.assertEquals(id, check.ballotForToken(tk.increaseSlightly()), check.test.ballotForToken(token.increaseSlightly()));
}
// check some random
{
int count = 1000;
while (count-- > 0)
{
LongToken token = new LongToken(random.nextLong());
Assert.assertEquals(id, check.ballotForToken(token), check.test.ballotForToken(token));
}
}
}
static class RandomPaxosRepairHistory
{
PaxosRepairHistory test = PaxosRepairHistory.EMPTY;
void add(Collection<Range<Token>> ranges, Ballot ballot)
{
test = PaxosRepairHistory.add(test, ranges, ballot);
}
void merge(RandomPaxosRepairHistory other)
{
test = PaxosRepairHistory.merge(test, other.test);
}
void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance)
{
int count = maxRangeCount == 1 ? 1 : 1 + random.nextInt(maxRangeCount - 1);
Ballot ballot = atUnixMicros(random.nextInt(Integer.MAX_VALUE), NONE);
List<Range<Token>> ranges = new ArrayList<>();
while (count-- > 0)
{
long length = (long) (2 * random.nextDouble() * maxCoverage * Long.MAX_VALUE);
if (length == 0) length = 1;
Range<Token> range;
if (random.nextFloat() <= minChance)
{
if (random.nextBoolean()) range = new Range<>(Murmur3Partitioner.MINIMUM, new LongToken(Long.MIN_VALUE + length));
else range = new Range<>(new LongToken(Long.MAX_VALUE - length), Murmur3Partitioner.MINIMUM);
}
else
{
long start = random.nextLong();
range = new Range<>(new LongToken(start), new LongToken(start + length));
}
ranges.add(range);
}
ranges.sort(Range::compareTo);
add(deoverlap(ranges), ballot);
}
void addRandom(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerAddition, float minTokenChance)
{
while (count-- > 0)
addOneRandom(random, maxNumberOfRangesPerAddition, maxCoveragePerAddition, minTokenChance);
}
static PaxosRepairHistory build(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinToken)
{
RandomPaxosRepairHistory result = new RandomPaxosRepairHistory();
result.addRandom(random, count, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinToken);
return result.test;
}
}
static class RandomWithCanonical extends RandomPaxosRepairHistory
{
NavigableMap<Token, Ballot> canonical = new TreeMap<>();
{
canonical.put(Murmur3Partitioner.MINIMUM, none());
}
Ballot ballotForToken(LongToken token)
{
return canonical
.floorEntry(token.token == Long.MIN_VALUE ? token : token.decreaseSlightly())
.getValue();
}
RandomWithCanonical merge(RandomWithCanonical other)
{
RandomWithCanonical result = new RandomWithCanonical();
result.test = PaxosRepairHistory.merge(test, other.test);
result.canonical = new TreeMap<>();
result.canonical.putAll(canonical);
for (Map.Entry<Token, Ballot> entry : other.canonical.entrySet())
{
Token left = entry.getKey();
Token right = other.canonical.higherKey(left);
if (right == null) right = Murmur3Partitioner.MINIMUM;
result.addCanonical(new Range<>(left, right), entry.getValue());
}
return result;
}
void serdeser()
{
PaxosRepairHistory tmp = PaxosRepairHistory.fromTupleBufferList(test.toTupleBufferList());
Assert.assertEquals(test, tmp);
test = tmp;
}
void add(Collection<Range<Token>> addRanges, Ballot ballot)
{
super.add(addRanges, ballot);
for (Range<Token> range : addRanges)
addCanonical(range, ballot);
}
void addCanonical(Range<Token> range, Ballot ballot)
{
canonical.put(range.left, canonical.floorEntry(range.left).getValue());
if (!range.right.isMinimum())
canonical.put(range.right, canonical.floorEntry(range.right).getValue());
for (Range<Token> r : range.unwrap())
{
(r.right.isMinimum()
? canonical.subMap(r.left, true, new LongToken(Long.MAX_VALUE), true)
: canonical.subMap(r.left, true, r.right, false)
).entrySet().forEach(e -> e.setValue(latest(e.getValue(), ballot)));
}
}
}
}