blob: 596c4d79a384dea9825ee2f58f178942d06469bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.streamhist;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.junit.Test;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.psjava.util.AssertStatus;
import org.quicktheories.core.Gen;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.integers;
import static org.quicktheories.generators.SourceDSL.lists;
public class StreamingTombstoneHistogramBuilderTest
{
@Test
public void testFunction() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9, 32, 30, 45 };
// add 7 points to histogram of 5 bins
for (int i = 0; i < 7; i++)
{
builder.update(samples[i]);
}
// should end up (2,1),(9.5,2),(17.5,2),(23,1),(36,1)
Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
expected1.put(2.0, 1L);
expected1.put(9.0, 2L);
expected1.put(17.0, 2L);
expected1.put(23.0, 1L);
expected1.put(36.0, 1L);
Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
TombstoneHistogram hist = builder.build();
hist.forEach((point, value) ->
{
Map.Entry<Double, Long> entry = expectedItr.next();
assertEquals(entry.getKey(), point, 0.01);
assertEquals(entry.getValue().longValue(), value);
});
// sum test
assertEquals(3.5, hist.sum(15), 0.01);
// sum test (b > max(hist))
assertEquals(7.0, hist.sum(50), 0.01);
}
@Test
public void testSerDe() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9 };
// add 7 points to histogram of 5 bins
for (int i = 0; i < samples.length; i++)
{
builder.update(samples[i]);
}
TombstoneHistogram hist = builder.build();
DataOutputBuffer out = new DataOutputBuffer();
TombstoneHistogram.serializer.serialize(hist, out);
byte[] bytes = out.toByteArray();
TombstoneHistogram deserialized = TombstoneHistogram.serializer.deserialize(new DataInputBuffer(bytes));
// deserialized histogram should have following values
Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
expected1.put(2.0, 1L);
expected1.put(9.0, 2L);
expected1.put(17.0, 2L);
expected1.put(23.0, 1L);
expected1.put(36.0, 1L);
Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
deserialized.forEach((point, value) ->
{
Map.Entry<Double, Long> entry = expectedItr.next();
assertEquals(entry.getKey(), point, 0.01);
assertEquals(entry.getValue().longValue(), value);
});
}
@Test
public void testNumericTypes() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
builder.update(2);
builder.update(2);
builder.update(2);
builder.update(2, Integer.MAX_VALUE); // To check that value overflow is handled correctly
TombstoneHistogram hist = builder.build();
Map<Integer, Integer> asMap = asMap(hist);
assertEquals(1, asMap.size());
assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
//Make sure it's working with Serde
DataOutputBuffer out = new DataOutputBuffer();
TombstoneHistogram.serializer.serialize(hist, out);
byte[] bytes = out.toByteArray();
TombstoneHistogram deserialized = TombstoneHistogram.serializer.deserialize(new DataInputBuffer(bytes));
asMap = asMap(deserialized);
assertEquals(1, deserialized.size());
assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
}
@Test
public void testOverflow() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 1);
int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31,
32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
32, 32, 33, 34, 35, 70, 78, 80, 90, 100
};
// Hit the spool cap, force it to make bins
for (int i = 0; i < samples.length; i++)
{
builder.update(samples[i]);
}
assertEquals(5, builder.build().size());
}
@Test
public void testRounding() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 60);
int[] samples = new int[]{ 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180, 240, 300
for (int i = 0; i < samples.length; i++)
builder.update(samples[i]);
TombstoneHistogram hist = builder.build();
assertEquals(hist.size(), 5);
assertEquals(asMap(hist).get(60).intValue(), 2);
assertEquals(asMap(hist).get(120).intValue(), 1);
}
@Test
public void testLargeValues() throws Exception
{
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
IntStream.range(Integer.MAX_VALUE - 30, Integer.MAX_VALUE).forEach(builder::update);
}
@Test
public void testLargeDeletionTimesAndLargeValuesDontCauseOverflow()
{
qt().forAll(streamingTombstoneHistogramBuilderGen(1000, 300000, 60),
lists().of(integers().from(0).upTo(Cell.MAX_DELETION_TIME)).ofSize(300),
lists().of(integers().allPositive()).ofSize(300))
.checkAssert(this::updateHistogramAndCheckAllBucketsArePositive);
}
private void updateHistogramAndCheckAllBucketsArePositive(StreamingTombstoneHistogramBuilder histogramBuilder, List<Integer> keys, List<Integer> values)
{
for (int i = 0; i < keys.size(); i++)
{
histogramBuilder.update(keys.get(i), values.get(i));
}
TombstoneHistogram histogram = histogramBuilder.build();
for (Map.Entry<Integer, Integer> buckets : asMap(histogram).entrySet())
{
assertTrue("Invalid bucket key", buckets.getKey() >= 0);
assertTrue("Invalid bucket value", buckets.getValue() >= 0);
}
}
@Test
public void testThatPointIsNotMissedBecauseOfRoundingToNoDeletionTime() throws Exception
{
int pointThatRoundedToNoDeletion = Cell.NO_DELETION_TIME - 2;
assert pointThatRoundedToNoDeletion + pointThatRoundedToNoDeletion % 3 == Cell.NO_DELETION_TIME : "test data should be valid";
StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 3);
builder.update(pointThatRoundedToNoDeletion);
TombstoneHistogram histogram = builder.build();
Map<Integer, Integer> integerIntegerMap = asMap(histogram);
assertEquals(integerIntegerMap.size(), 1);
assertEquals(integerIntegerMap.get(Cell.MAX_DELETION_TIME).intValue(), 1);
}
@Test
public void testInvalidArguments()
{
assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, 0)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:0");
assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, -1)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:-1");
assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, -1, 60)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:-1 delta:60");
assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(-1, 10, 60)).hasMessage("Invalid arguments: maxBinSize:-1 maxSpoolSize:10 delta:60");
assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(0, 10, 60)).hasMessage("Invalid arguments: maxBinSize:0 maxSpoolSize:10 delta:60");
}
@Test
public void testSpool()
{
StreamingTombstoneHistogramBuilder.Spool spool = new StreamingTombstoneHistogramBuilder.Spool(8);
assertTrue(spool.tryAddOrAccumulate(5, 1));
assertSpool(spool, 5, 1);
assertTrue(spool.tryAddOrAccumulate(5, 3));
assertSpool(spool, 5, 4);
assertTrue(spool.tryAddOrAccumulate(10, 1));
assertSpool(spool, 5, 4,
10, 1);
assertTrue(spool.tryAddOrAccumulate(12, 1));
assertTrue(spool.tryAddOrAccumulate(14, 1));
assertTrue(spool.tryAddOrAccumulate(16, 1));
assertSpool(spool, 5, 4,
10, 1,
12, 1,
14, 1,
16, 1);
assertTrue(spool.tryAddOrAccumulate(18, 1));
assertTrue(spool.tryAddOrAccumulate(20, 1));
assertTrue(spool.tryAddOrAccumulate(30, 1));
assertSpool(spool, 5, 4,
10, 1,
12, 1,
14, 1,
16, 1,
18, 1,
20, 1,
30, 1);
assertTrue(spool.tryAddOrAccumulate(16, 5));
assertTrue(spool.tryAddOrAccumulate(12, 4));
assertTrue(spool.tryAddOrAccumulate(18, 9));
assertSpool(spool,
5, 4,
10, 1,
12, 5,
14, 1,
16, 6,
18, 10,
20, 1,
30, 1);
assertTrue(spool.tryAddOrAccumulate(99, 5));
}
@Test
public void testDataHolder()
{
StreamingTombstoneHistogramBuilder.DataHolder dataHolder = new StreamingTombstoneHistogramBuilder.DataHolder(4, 1);
assertFalse(dataHolder.isFull());
assertEquals(0, dataHolder.size());
assertTrue(dataHolder.addValue(4, 1));
assertDataHolder(dataHolder,
4, 1);
assertFalse(dataHolder.addValue(4, 1));
assertDataHolder(dataHolder,
4, 2);
assertTrue(dataHolder.addValue(7, 1));
assertDataHolder(dataHolder,
4, 2,
7, 1);
assertFalse(dataHolder.addValue(7, 1));
assertDataHolder(dataHolder,
4, 2,
7, 2);
assertTrue(dataHolder.addValue(5, 1));
assertDataHolder(dataHolder,
4, 2,
5, 1,
7, 2);
assertFalse(dataHolder.addValue(5, 1));
assertDataHolder(dataHolder,
4, 2,
5, 2,
7, 2);
assertTrue(dataHolder.addValue(2, 1));
assertDataHolder(dataHolder,
2, 1,
4, 2,
5, 2,
7, 2);
assertTrue(dataHolder.isFull());
// expect to merge [4,2]+[5,2]
dataHolder.mergeNearestPoints();
assertDataHolder(dataHolder,
2, 1,
4, 4,
7, 2);
assertFalse(dataHolder.addValue(2, 1));
assertDataHolder(dataHolder,
2, 2,
4, 4,
7, 2);
dataHolder.addValue(8, 1);
assertDataHolder(dataHolder,
2, 2,
4, 4,
7, 2,
8, 1);
assertTrue(dataHolder.isFull());
// expect to merge [7,2]+[8,1]
dataHolder.mergeNearestPoints();
assertDataHolder(dataHolder,
2, 2,
4, 4,
7, 3);
}
private static void assertDataHolder(StreamingTombstoneHistogramBuilder.DataHolder dataHolder, int... pointValue)
{
assertEquals(pointValue.length / 2, dataHolder.size());
for (int i = 0; i < pointValue.length; i += 2)
{
int point = pointValue[i];
int expectedValue = pointValue[i + 1];
assertEquals(expectedValue, dataHolder.getValue(point));
}
}
/**
* Compare the contents of {@code spool} with the given collection of key-value pairs in {@code pairs}.
*/
private static void assertSpool(StreamingTombstoneHistogramBuilder.Spool spool, int... pairs)
{
assertEquals(pairs.length / 2, spool.size);
Map<Integer, Integer> tests = new HashMap<>();
for (int i = 0; i < pairs.length; i += 2)
tests.put(pairs[i], pairs[i + 1]);
spool.forEach((k, v) -> {
Integer x = tests.remove(k);
assertNotNull("key " + k, x);
assertEquals(x.intValue(), v);
});
AssertStatus.assertTrue(tests.isEmpty());
}
private Map<Integer, Integer> asMap(TombstoneHistogram histogram)
{
Map<Integer, Integer> result = new HashMap<>();
histogram.forEach(result::put);
return result;
}
private Gen<StreamingTombstoneHistogramBuilder> streamingTombstoneHistogramBuilderGen(int maxBinSize, int maxSpoolSize, int maxRoundSeconds)
{
return positiveIntegerUpTo(maxBinSize).zip(integers().between(0, maxSpoolSize),
positiveIntegerUpTo(maxRoundSeconds),
StreamingTombstoneHistogramBuilder::new);
}
private Gen<Integer> positiveIntegerUpTo(int upperBound)
{
return integers().between(1, upperBound);
}
}