blob: a92bfefcca35f41c9e4c2c0841287d755efb04ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
import accord.impl.IntKey;
import accord.local.Node;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import org.opentest4j.AssertionFailedError;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Integer.MIN_VALUE;
// TODO (desired): test start inclusive ranges
public class ReducingRangeMapTest
{
static final Logger logger = LoggerFactory.getLogger(ReducingRangeMapTest.class);
static final ReducingRangeMap<Timestamp> EMPTY = new ReducingRangeMap<>();
static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE);
static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE);
static boolean END_INCLUSIVE = false;
private static RoutingKey rk(int t)
{
return new IntKey.Routing(t);
}
private static RoutingKey rk(Random random)
{
int rk = random.nextInt();
if (random.nextBoolean()) rk = -rk;
if (rk == MAX_VALUE) --rk;
if (rk == MIN_VALUE) ++rk;
return new IntKey.Routing(rk);
}
private static Timestamp none()
{
return null;
}
private static Timestamp ts(int b)
{
return Timestamp.fromValues(1, b, 0, new Node.Id(1));
}
private static Range r(RoutingKey l, RoutingKey r)
{
return END_INCLUSIVE ? new Range.EndInclusive(l, r) : new Range.StartInclusive(l, r);
}
private static RoutingKey incr(RoutingKey rk)
{
return new IntKey.Routing(((IntKey.Routing)rk).key + 1);
}
private static RoutingKey decr(RoutingKey rk)
{
return new IntKey.Routing(((IntKey.Routing)rk).key - 1);
}
private static Range r(int l, int r)
{
return r(rk(l), rk(r));
}
private static Pair<RoutingKey, Timestamp> pt(int t, int b)
{
return Pair.create(rk(t), ts(b));
}
private static Pair<RoutingKey, Timestamp> pt(int t, Timestamp b)
{
return Pair.create(rk(t), b);
}
private static Pair<RoutingKey, Timestamp> pt(RoutingKey t, int b)
{
return Pair.create(t, ts(b));
}
private static ReducingRangeMap<Timestamp> h(Pair<RoutingKey, Timestamp>... points)
{
Invariants.checkState(points[0].right == none());
int length = points.length;
RoutingKey[] routingKeys = new RoutingKey[length];
Timestamp[] timestamps = new Timestamp[length - 1];
for (int i = 1 ; i < length ; ++i)
{
routingKeys[i - 1] = points[i - 1].left;
timestamps[i - 1] = points[i].right;
}
routingKeys[length - 1] = points[length - 1].left;
return new ReducingRangeMap<>(true, routingKeys, timestamps);
}
static
{
assert rk(100).equals(rk(100));
assert ts(111).equals(ts(111));
}
private static class Builder
{
ReducingRangeMap<Timestamp> history = EMPTY;
Builder add(Timestamp timestamp, Range... ranges)
{
history = ReducingRangeMap.add(history, Ranges.of(ranges), timestamp);
return this;
}
Builder clear()
{
history = EMPTY;
return this;
}
}
static Builder builder()
{
return new Builder();
}
@Test
public void testOne()
{
testRandomAdds(8532037884171168001L, 3, 1, 3, 0.100000f, 0.100000f);
}
@Test
public void testRandomAdds() throws ExecutionException, InterruptedException
{
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<ListenableFuture<Void>> results = new ArrayList<>();
int count = 100000;
for (int numberOfAdditions : new int[] { 1, 10, 100 })
{
for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f })
{
for (float chanceOfMinRoutingKey : new float[] { 0.01f, 0.1f })
{
results.addAll(testRandomAdds(executor, count, 3, numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinRoutingKey));
}
}
}
Futures.allAsList(results).get();
executor.shutdown();
}
private List<ListenableFuture<Void>> testRandomAdds(ExecutorService executor, int tests, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
{
return ThreadLocalRandom.current()
.longs(tests)
.mapToObj(seed -> {
SettableFuture<Void> promise = SettableFuture.create();
executor.execute(() -> {
try
{
testRandomAdds(seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
promise.set(null);
}
catch (Throwable t)
{
promise.setException(t);
}
});
return promise;
})
.collect(Collectors.toList());
}
private void testRandomAdds(long seed, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
{
String id = String.format("%d, %d, %d, %d, %f, %f", seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
try
{
Random random = new Random(seed);
logger.info(id);
List<RandomWithCanonical> merge = new ArrayList<>();
while (numberOfMerges-- > 0)
{
RandomWithCanonical build = new RandomWithCanonical();
build.addRandom(random, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
build.validate(random, id);
merge.add(build);
}
RandomWithCanonical check = new RandomWithCanonical();
for (RandomWithCanonical add : merge)
check = check.merge(add);
// check.serdeser();
check.validate(random, id);
}
catch (Throwable t)
{
if (!(t instanceof AssertionFailedError))
throw new RuntimeException(id, t);
}
}
static class RandomMap
{
ReducingRangeMap<Timestamp> test = new ReducingRangeMap<>();
void add(Ranges ranges, Timestamp timestamp)
{
test = ReducingRangeMap.add(test, ranges, timestamp);
}
void merge(RandomMap other)
{
test = ReducingRangeMap.merge(test, other.test, Timestamp::max);
}
void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance)
{
int count = maxRangeCount == 1 ? 1 : 1 + random.nextInt(maxRangeCount - 1);
Timestamp timestamp = ts(random.nextInt(MAX_VALUE));
List<Range> ranges = new ArrayList<>();
while (count-- > 0)
{
int length = (int) (2 * random.nextDouble() * maxCoverage * MAX_VALUE);
if (length == 0) length = 1;
Range range;
if (random.nextFloat() <= minChance)
{
if (random.nextBoolean()) range = r(MIN_VALUE + 1, MIN_VALUE + 1 + length);
else range = r(MAX_VALUE - length - 1, MAX_VALUE - 1);
}
else
{
int start = random.nextInt(MAX_VALUE - length - 1);
range = r(start, start + length);
}
ranges.add(range);
}
add(Ranges.of(ranges.toArray(new Range[0])), timestamp);
}
void addRandom(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerAddition, float minRoutingKeyChance)
{
while (count-- > 0)
addOneRandom(random, maxNumberOfRangesPerAddition, maxCoveragePerAddition, minRoutingKeyChance);
}
static ReducingRangeMap<Timestamp> build(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey)
{
RandomMap result = new RandomMap();
result.addRandom(random, count, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
return result.test;
}
}
static class RandomWithCanonical extends RandomMap
{
// confusingly, we use lower bounds here since we copied over from C*
NavigableMap<RoutingKey, Timestamp> canonical = new TreeMap<>();
{
canonical.put(MINIMUM_EXCL, none());
canonical.put(MAXIMUM_EXCL, none());
}
Timestamp get(RoutingKey rk)
{
return canonical.ceilingEntry(rk).getValue();
}
RandomWithCanonical merge(RandomWithCanonical other)
{
RandomWithCanonical result = new RandomWithCanonical();
result.test = ReducingRangeMap.merge(test, other.test, Timestamp::max);
result.canonical = new TreeMap<>();
result.canonical.putAll(canonical);
RoutingKey prev = null;
for (Map.Entry<RoutingKey, Timestamp> entry : other.canonical.entrySet())
{
if (prev != null) result.addCanonical(r(prev, entry.getKey()), entry.getValue());
prev = entry.getKey();
}
return result;
}
// void serdeser()
// {
// ReduceRangeMap<RoutingKey, Timestamp> tmp = ReduceRangeMap.fromTupleBufferList(test.toTupleBufferList());
// Assertions.assertEquals(test, tmp);
// test = tmp;
// }
@Override
void add(Ranges addRanges, Timestamp timestamp)
{
super.add(addRanges, timestamp);
for (Range range : addRanges)
addCanonical(range, timestamp);
}
@Override
void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance)
{
super.addOneRandom(random, maxRangeCount, maxCoverage, minChance);
// validate(new Random(), "");
}
void addCanonical(Range range, Timestamp timestamp)
{
canonical.put(range.start(), canonical.ceilingEntry(range.start()).getValue());
canonical.put(range.end(), canonical.ceilingEntry(range.end()).getValue());
canonical.subMap(range.start(), !END_INCLUSIVE, range.end(), END_INCLUSIVE)
.entrySet().forEach(e -> e.setValue(Timestamp.nonNullOrMax(e.getValue(), timestamp)));
}
void validate(Random random, String id)
{
for (RoutingKey rk : canonical.keySet())
{
Assertions.assertEquals(get(decr(rk)), test.get(decr(rk)), id);
Assertions.assertEquals(get(rk), test.get(rk), id);
Assertions.assertEquals(get(incr(rk)), test.get(incr(rk)), id);
}
// check some random
{
int remaining = 1000;
while (remaining-- > 0)
{
RoutingKey routingKey = rk(random);
Assertions.assertEquals(get(routingKey), test.get(routingKey), id);
}
}
// validate foldl
{
int remaining = 100;
while (remaining-- > 0)
{
int count = 1 + random.nextInt(20);
RoutingKeys keys;
Ranges ranges;
{
RoutingKey[] tmp = new RoutingKey[count];
for (int i = 0 ; i < tmp.length ; ++i)
tmp[i] = rk(random);
keys = RoutingKeys.of(tmp);
Range[] tmp2 = new Range[(keys.size() + 1) / 2];
int i = 0, c = 0;
if (keys.size() % 2 == 1 && random.nextBoolean())
tmp2[c++] = r(MINIMUM_EXCL, keys.get(i++));
while (i + 1 < keys.size())
{
tmp2[c++] = r(keys.get(i), keys.get(i+1));
i += 2;
}
if (i < keys.size())
tmp2[c++] = r(keys.get(i++), MAXIMUM_EXCL);
ranges = Ranges.of(tmp2);
}
List<Timestamp> foldl = test.foldl(keys, (timestamp, timestamps) -> {
if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp))
timestamps.add(timestamp);
return timestamps;
}, new ArrayList<>(), ignore -> false);
List<Timestamp> canonFoldl = new ArrayList<>();
for (RoutingKey key : keys)
{
Timestamp next = get(key);
if (next == null)
continue;
if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next))
canonFoldl.add(next);
}
Assertions.assertEquals(canonFoldl, foldl, id);
foldl = test.foldl(ranges, (timestamp, timestamps) -> {
if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp))
timestamps.add(timestamp);
return timestamps;
}, new ArrayList<>(), ignore -> false);
canonFoldl.clear();
for (Range range : ranges)
{
RoutingKey start = END_INCLUSIVE ? canonical.higherKey(range.start()) : canonical.ceilingKey(range.start());
RoutingKey end = END_INCLUSIVE ? canonical.ceilingKey(range.end()) : canonical.higherKey(range.end());
for (Timestamp next : canonical.subMap(start, true, end, true).values())
{
if (next == null)
continue;
if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next))
canonFoldl.add(next);
}
}
Assertions.assertEquals(canonFoldl, foldl, id);
}
}
}
}
}