blob: a92bfefcca35f41c9e4c2c0841287d755efb04ba [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
import accord.impl.IntKey;
import accord.local.Node;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import org.opentest4j.AssertionFailedError;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Integer.MIN_VALUE;
// TODO (desired): test start inclusive ranges
public class ReducingRangeMapTest
static final Logger logger = LoggerFactory.getLogger(ReducingRangeMapTest.class);
static final ReducingRangeMap<Timestamp> EMPTY = new ReducingRangeMap<>();
static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE);
static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE);
static boolean END_INCLUSIVE = false;
private static RoutingKey rk(int t)
return new IntKey.Routing(t);
private static RoutingKey rk(Random random)
int rk = random.nextInt();
if (random.nextBoolean()) rk = -rk;
if (rk == MAX_VALUE) --rk;
if (rk == MIN_VALUE) ++rk;
return new IntKey.Routing(rk);
private static Timestamp none()
return null;
private static Timestamp ts(int b)
return Timestamp.fromValues(1, b, 0, new Node.Id(1));
private static Range r(RoutingKey l, RoutingKey r)
return END_INCLUSIVE ? new Range.EndInclusive(l, r) : new Range.StartInclusive(l, r);
private static RoutingKey incr(RoutingKey rk)
return new IntKey.Routing(((IntKey.Routing)rk).key + 1);
private static RoutingKey decr(RoutingKey rk)
return new IntKey.Routing(((IntKey.Routing)rk).key - 1);
private static Range r(int l, int r)
return r(rk(l), rk(r));
private static Pair<RoutingKey, Timestamp> pt(int t, int b)
return Pair.create(rk(t), ts(b));
private static Pair<RoutingKey, Timestamp> pt(int t, Timestamp b)
return Pair.create(rk(t), b);
private static Pair<RoutingKey, Timestamp> pt(RoutingKey t, int b)
return Pair.create(t, ts(b));
private static ReducingRangeMap<Timestamp> h(Pair<RoutingKey, Timestamp>... points)
Invariants.checkState(points[0].right == none());
int length = points.length;
RoutingKey[] routingKeys = new RoutingKey[length];
Timestamp[] timestamps = new Timestamp[length - 1];
for (int i = 1 ; i < length ; ++i)
routingKeys[i - 1] = points[i - 1].left;
timestamps[i - 1] = points[i].right;
routingKeys[length - 1] = points[length - 1].left;
return new ReducingRangeMap<>(true, routingKeys, timestamps);
assert rk(100).equals(rk(100));
assert ts(111).equals(ts(111));
private static class Builder
ReducingRangeMap<Timestamp> history = EMPTY;
Builder add(Timestamp timestamp, Range... ranges)
history = ReducingRangeMap.add(history, Ranges.of(ranges), timestamp);
return this;
Builder clear()
history = EMPTY;
return this;
static Builder builder()
return new Builder();
public void testOne()
testRandomAdds(8532037884171168001L, 3, 1, 3, 0.100000f, 0.100000f);
public void testRandomAdds() throws ExecutionException, InterruptedException
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<ListenableFuture<Void>> results = new ArrayList<>();
int count = 100000;
for (int numberOfAdditions : new int[] { 1, 10, 100 })
for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f })
for (float chanceOfMinRoutingKey : new float[] { 0.01f, 0.1f })
results.addAll(testRandomAdds(executor, count, 3, numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinRoutingKey));
private List<ListenableFuture<Void>> testRandomAdds(ExecutorService executor, int tests, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
return ThreadLocalRandom.current()
.mapToObj(seed -> {
SettableFuture<Void> promise = SettableFuture.create();
executor.execute(() -> {
testRandomAdds(seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
catch (Throwable t)
return promise;
private void testRandomAdds(long seed, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
String id = String.format("%d, %d, %d, %d, %f, %f", seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
Random random = new Random(seed);;
List<RandomWithCanonical> merge = new ArrayList<>();
while (numberOfMerges-- > 0)
RandomWithCanonical build = new RandomWithCanonical();
build.addRandom(random, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
build.validate(random, id);
RandomWithCanonical check = new RandomWithCanonical();
for (RandomWithCanonical add : merge)
check = check.merge(add);
// check.serdeser();
check.validate(random, id);
catch (Throwable t)
if (!(t instanceof AssertionFailedError))
throw new RuntimeException(id, t);
static class RandomMap
ReducingRangeMap<Timestamp> test = new ReducingRangeMap<>();
void add(Ranges ranges, Timestamp timestamp)
test = ReducingRangeMap.add(test, ranges, timestamp);
void merge(RandomMap other)
test = ReducingRangeMap.merge(test, other.test, Timestamp::max);
void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance)
int count = maxRangeCount == 1 ? 1 : 1 + random.nextInt(maxRangeCount - 1);
Timestamp timestamp = ts(random.nextInt(MAX_VALUE));
List<Range> ranges = new ArrayList<>();
while (count-- > 0)
int length = (int) (2 * random.nextDouble() * maxCoverage * MAX_VALUE);
if (length == 0) length = 1;
Range range;
if (random.nextFloat() <= minChance)
if (random.nextBoolean()) range = r(MIN_VALUE + 1, MIN_VALUE + 1 + length);
else range = r(MAX_VALUE - length - 1, MAX_VALUE - 1);
int start = random.nextInt(MAX_VALUE - length - 1);
range = r(start, start + length);
add(Ranges.of(ranges.toArray(new Range[0])), timestamp);
void addRandom(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerAddition, float minRoutingKeyChance)
while (count-- > 0)
addOneRandom(random, maxNumberOfRangesPerAddition, maxCoveragePerAddition, minRoutingKeyChance);
static ReducingRangeMap<Timestamp> build(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
RandomMap result = new RandomMap();
result.addRandom(random, count, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
return result.test;
static class RandomWithCanonical extends RandomMap
// confusingly, we use lower bounds here since we copied over from C*
NavigableMap<RoutingKey, Timestamp> canonical = new TreeMap<>();
canonical.put(MINIMUM_EXCL, none());
canonical.put(MAXIMUM_EXCL, none());
Timestamp get(RoutingKey rk)
return canonical.ceilingEntry(rk).getValue();
RandomWithCanonical merge(RandomWithCanonical other)
RandomWithCanonical result = new RandomWithCanonical();
result.test = ReducingRangeMap.merge(test, other.test, Timestamp::max);
result.canonical = new TreeMap<>();
RoutingKey prev = null;
for (Map.Entry<RoutingKey, Timestamp> entry : other.canonical.entrySet())
if (prev != null) result.addCanonical(r(prev, entry.getKey()), entry.getValue());
prev = entry.getKey();
return result;
// void serdeser()
// {
// ReduceRangeMap<RoutingKey, Timestamp> tmp = ReduceRangeMap.fromTupleBufferList(test.toTupleBufferList());
// Assertions.assertEquals(test, tmp);
// test = tmp;
// }
void add(Ranges addRanges, Timestamp timestamp)
super.add(addRanges, timestamp);
for (Range range : addRanges)
addCanonical(range, timestamp);
void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance)
super.addOneRandom(random, maxRangeCount, maxCoverage, minChance);
// validate(new Random(), "");
void addCanonical(Range range, Timestamp timestamp)
canonical.put(range.start(), canonical.ceilingEntry(range.start()).getValue());
canonical.put(range.end(), canonical.ceilingEntry(range.end()).getValue());
canonical.subMap(range.start(), !END_INCLUSIVE, range.end(), END_INCLUSIVE)
.entrySet().forEach(e -> e.setValue(Timestamp.nonNullOrMax(e.getValue(), timestamp)));
void validate(Random random, String id)
for (RoutingKey rk : canonical.keySet())
Assertions.assertEquals(get(decr(rk)), test.get(decr(rk)), id);
Assertions.assertEquals(get(rk), test.get(rk), id);
Assertions.assertEquals(get(incr(rk)), test.get(incr(rk)), id);
// check some random
int remaining = 1000;
while (remaining-- > 0)
RoutingKey routingKey = rk(random);
Assertions.assertEquals(get(routingKey), test.get(routingKey), id);
// validate foldl
int remaining = 100;
while (remaining-- > 0)
int count = 1 + random.nextInt(20);
RoutingKeys keys;
Ranges ranges;
RoutingKey[] tmp = new RoutingKey[count];
for (int i = 0 ; i < tmp.length ; ++i)
tmp[i] = rk(random);
keys = RoutingKeys.of(tmp);
Range[] tmp2 = new Range[(keys.size() + 1) / 2];
int i = 0, c = 0;
if (keys.size() % 2 == 1 && random.nextBoolean())
tmp2[c++] = r(MINIMUM_EXCL, keys.get(i++));
while (i + 1 < keys.size())
tmp2[c++] = r(keys.get(i), keys.get(i+1));
i += 2;
if (i < keys.size())
tmp2[c++] = r(keys.get(i++), MAXIMUM_EXCL);
ranges = Ranges.of(tmp2);
List<Timestamp> foldl = test.foldl(keys, (timestamp, timestamps) -> {
if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp))
return timestamps;
}, new ArrayList<>(), ignore -> false);
List<Timestamp> canonFoldl = new ArrayList<>();
for (RoutingKey key : keys)
Timestamp next = get(key);
if (next == null)
if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next))
Assertions.assertEquals(canonFoldl, foldl, id);
foldl = test.foldl(ranges, (timestamp, timestamps) -> {
if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp))
return timestamps;
}, new ArrayList<>(), ignore -> false);
for (Range range : ranges)
RoutingKey start = END_INCLUSIVE ? canonical.higherKey(range.start()) : canonical.ceilingKey(range.start());
RoutingKey end = END_INCLUSIVE ? canonical.ceilingKey(range.end()) : canonical.higherKey(range.end());
for (Timestamp next : canonical.subMap(start, true, end, true).values())
if (next == null)
if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next))
Assertions.assertEquals(canonFoldl, foldl, id);