blob: 76630eced91a18f7da46a918530a077982c53a76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.streamhist;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.IntMath;
import org.apache.cassandra.db.rows.Cell;
/**
* Histogram that can be constructed from streaming of data.
*
* Histogram used to retrieve the number of droppable tombstones for example via
* {@link org.apache.cassandra.io.sstable.format.SSTableReader#getDroppableTombstonesBefore(int)}.
* <p>
* When an sstable is written (or streamed), this histogram-builder receives the "local deletion timestamp"
* as an {@code int} via {@link #update(int)}. Negative values are not supported.
* <p>
* Algorithm: Histogram is represented as collection of {point, weight} pairs. When new point <i>p</i> with weight <i>m</i> is added:
* <ol>
* <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
* <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
* <li>If point was added and collection size became larger than maxBinSize:</li>
* </ol>
*
* <ol type="a">
* <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
* <li>Replace these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
* </ol>
*
* <p>
* There are some optimization to make histogram builder faster:
* <ol>
* <li>Spool: big map that saves from excessively merging of small bin. This map can contains up to maxSpoolSize points and accumulate weight from same points.
* For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
* <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
* Spool can not resize => when number of collisions became bigger than threshold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
* <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
* <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li>
* </ol>
* <p>
* The original algorithm is taken from following paper:
* Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
* http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
*/
public class StreamingTombstoneHistogramBuilder
{
// Buffer with point-value pair
private final DataHolder bin;
// Keep a second, larger buffer to spool data in, before finalizing it into `bin`
private Spool spool;
// voluntarily give up resolution for speed
private final int roundSeconds;
public StreamingTombstoneHistogramBuilder(int maxBinSize, int maxSpoolSize, int roundSeconds)
{
assert maxBinSize > 0 && maxSpoolSize >= 0 && roundSeconds > 0: "Invalid arguments: maxBinSize:" + maxBinSize + " maxSpoolSize:" + maxSpoolSize + " delta:" + roundSeconds;
this.roundSeconds = roundSeconds;
this.bin = new DataHolder(maxBinSize + 1, roundSeconds);
this.spool = new Spool(maxSpoolSize);
}
/**
* Adds new point to this histogram with a default value of 1.
*
* @param point the point to be added
*/
public void update(int point)
{
update(point, 1);
}
/**
* Adds new point {@param point} with value {@param value} to this histogram.
*/
public void update(int point, int value)
{
assert spool != null: "update is being called after releaseBuffers. This could be functionally okay, but this assertion is a canary to alert about unintended use before it is necessary.";
point = ceilKey(point, roundSeconds);
if (spool.capacity > 0)
{
if (!spool.tryAddOrAccumulate(point, value))
{
flushHistogram();
final boolean success = spool.tryAddOrAccumulate(point, value);
assert success : "Can not add value to spool"; // after spool flushing we should always be able to insert new value
}
}
else
{
flushValue(point, value);
}
}
/**
* Drain the temporary spool into the final bins
*/
public void flushHistogram()
{
Spool spool = this.spool;
if (spool != null)
{
spool.forEach(this::flushValue);
spool.clear();
}
}
/**
* Release inner spool buffers. Histogram remains readable and writable, but with lesser performance.
* Not intended for use before finalization.
*/
public void releaseBuffers()
{
flushHistogram();
spool = null;
}
private void flushValue(int key, int spoolValue)
{
bin.addValue(key, spoolValue);
if (bin.isFull())
{
bin.mergeNearestPoints();
}
}
/**
* Creates a 'finished' snapshot of the current state of the histogram, but leaves this builder instance
* open for subsequent additions to the histograms. Basically, this allows us to have some degree of sanity
* wrt sstable early open.
*/
public TombstoneHistogram build()
{
flushHistogram();
return new TombstoneHistogram(bin);
}
/**
* An ordered collection of histogram buckets, each entry in the collection represents a pair (bucket, count).
* Once the collection is full it merges the closest buckets using a weighted approach see {@link #mergeNearestPoints()}.
*/
static class DataHolder
{
private static final long EMPTY = Long.MAX_VALUE;
private final long[] data;
private final int roundSeconds;
DataHolder(int maxCapacity, int roundSeconds)
{
data = new long[maxCapacity];
Arrays.fill(data, EMPTY);
this.roundSeconds = roundSeconds;
}
DataHolder(DataHolder holder)
{
data = Arrays.copyOf(holder.data, holder.data.length);
roundSeconds = holder.roundSeconds;
}
@VisibleForTesting
int getValue(int point)
{
long key = wrap(point, 0);
int index = Arrays.binarySearch(data, key);
if (index < 0)
index = -index - 1;
if (index >= data.length)
return -1; // not-found sentinel
if (unwrapPoint(data[index]) != point)
return -2; // not-found sentinel
return unwrapValue(data[index]);
}
/**
* Adds value {@code delta} to the point {@code point}.
*
* @return {@code true} if inserted, {@code false} if accumulated
*/
boolean addValue(int point, int delta)
{
long key = wrap(point, 0);
int index = Arrays.binarySearch(data, key);
if (index < 0)
{
index = -index - 1;
assert (index < data.length) : "No more space in array";
if (unwrapPoint(data[index]) != point) //ok, someone else at this point, let's shift array and insert
{
assert (data[data.length - 1] == EMPTY) : "No more space in array";
System.arraycopy(data, index, data, index + 1, data.length - index - 1);
data[index] = wrap(point, delta);
return true;
}
else
{
data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
}
}
else
{
data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
}
return false;
}
/**
* Finds nearest points <i>p1</i> and <i>p2</i> in the collection
* Replaces these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
*/
@VisibleForTesting
void mergeNearestPoints()
{
assert isFull() : "DataHolder must be full in order to merge two points";
final int[] smallestDifference = findPointPairWithSmallestDistance();
final int point1 = smallestDifference[0];
final int point2 = smallestDifference[1];
long key = wrap(point1, 0);
int index = Arrays.binarySearch(data, key);
if (index < 0)
{
index = -index - 1;
assert (index < data.length) : "Not found in array";
assert (unwrapPoint(data[index]) == point1) : "Not found in array";
}
long value1 = unwrapValue(data[index]);
long value2 = unwrapValue(data[index + 1]);
assert (unwrapPoint(data[index + 1]) == point2) : "point2 should follow point1";
long sum = value1 + value2;
//let's evaluate in long values to handle overflow in multiplication
int newPoint = saturatingCastToInt((point1 * value1 + point2 * value2) / sum);
newPoint = ceilKey(newPoint, roundSeconds);
data[index] = wrap(newPoint, saturatingCastToInt(sum));
System.arraycopy(data, index + 2, data, index + 1, data.length - index - 2);
data[data.length - 1] = EMPTY;
}
private int[] findPointPairWithSmallestDistance()
{
assert isFull(): "The DataHolder must be full in order to find the closest pair of points";
int point1 = 0;
int point2 = Integer.MAX_VALUE;
for (int i = 0; i < data.length - 1; i++)
{
int pointA = unwrapPoint(data[i]);
int pointB = unwrapPoint(data[i + 1]);
assert pointB > pointA : "DataHolder not sorted, p2(" + pointB +") < p1(" + pointA + ") for " + this;
if (point2 - point1 > pointB - pointA)
{
point1 = pointA;
point2 = pointB;
}
}
return new int[]{point1, point2};
}
private int[] unwrap(long key)
{
final int point = unwrapPoint(key);
final int value = unwrapValue(key);
return new int[]{ point, value };
}
private int unwrapPoint(long key)
{
return (int) (key >> 32);
}
private int unwrapValue(long key)
{
return (int) (key & 0xFF_FF_FF_FFL);
}
private long wrap(int point, long value)
{
assert point >= 0 : "Invalid argument: point:" + point;
return (((long) point) << 32) | saturatingCastToInt(value);
}
public String toString()
{
return Arrays.stream(data).filter(x -> x != EMPTY).mapToObj(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
}
public boolean isFull()
{
return data[data.length - 1] != EMPTY;
}
public <E extends Exception> void forEach(HistogramDataConsumer<E> histogramDataConsumer) throws E
{
for (long datum : data)
{
if (datum == EMPTY)
{
break;
}
histogramDataConsumer.consume(unwrapPoint(datum), unwrapValue(datum));
}
}
public int size()
{
int[] accumulator = new int[1];
forEach((point, value) -> accumulator[0]++);
return accumulator[0];
}
public double sum(int b)
{
double sum = 0;
for (int i = 0; i < data.length; i++)
{
long pointAndValue = data[i];
if (pointAndValue == EMPTY)
{
break;
}
final int point = unwrapPoint(pointAndValue);
final int value = unwrapValue(pointAndValue);
if (point > b)
{
if (i == 0)
{ // no prev point
return 0;
}
else
{
final int prevPoint = unwrapPoint(data[i - 1]);
final int prevValue = unwrapValue(data[i - 1]);
// calculate estimated count mb for point b
double weight = (b - prevPoint) / (double) (point - prevPoint);
double mb = prevValue + (value - prevValue) * weight;
sum -= prevValue;
sum += (prevValue + mb) * weight / 2;
sum += prevValue / 2.0;
return sum;
}
}
else
{
sum += value;
}
}
return sum;
}
@Override
public int hashCode()
{
return Arrays.hashCode(data);
}
@Override
public boolean equals(Object o)
{
if (!(o instanceof DataHolder))
return false;
final DataHolder other = ((DataHolder) o);
if (this.size()!=other.size())
return false;
for (int i=0; i<size(); i++)
{
if (data[i]!=other.data[i])
{
return false;
}
}
return true;
}
}
/**
* This class is a specialized open addressing HashMap that uses int as keys and int as values.
* This is an optimization to avoid allocating objects.
* In order for this class to work correctly it should have a power of 2 capacity.
* This last invariant is taken care of during construction.
*/
static class Spool
{
final int[] points;
final int[] values;
final int capacity;
int size;
Spool(int requestedCapacity)
{
if (requestedCapacity < 0)
throw new IllegalArgumentException("Illegal capacity " + requestedCapacity);
this.capacity = getPowerOfTwoCapacity(requestedCapacity);
// x2 because we want no more than two reprobes on average when _capacity_ entries will be written
points = new int[capacity * 2];
values = new int[capacity * 2];
clear();
}
private int getPowerOfTwoCapacity(int requestedCapacity)
{
//for spool we need power-of-two cells
return requestedCapacity == 0 ? 0 : IntMath.pow(2, IntMath.log2(requestedCapacity, RoundingMode.CEILING));
}
void clear()
{
Arrays.fill(points, -1);
size = 0;
}
boolean tryAddOrAccumulate(int point, int delta)
{
if (size > capacity)
{
return false;
}
final int cell = (capacity - 1) & hash(point);
// We use linear scanning. I think cluster of 100 elements is large enough to give up.
for (int attempt = 0; attempt < 100; attempt++)
{
if (tryCell(cell + attempt, point, delta))
return true;
}
return false;
}
private int hash(int i)
{
long largePrime = 948701839L;
return (int) (i * largePrime);
}
<E extends Exception> void forEach(HistogramDataConsumer<E> consumer) throws E
{
for (int i = 0; i < points.length; i++)
{
if (points[i] != -1)
{
consumer.consume(points[i], values[i]);
}
}
}
private boolean tryCell(int cell, int point, int delta)
{
assert cell >= 0 && point >= 0 && delta >= 0 : "Invalid arguments: cell:" + cell + " point:" + point + " delta:" + delta;
cell = cell % points.length;
if (points[cell] == -1)
{
points[cell] = point;
values[cell] = delta;
size++;
return true;
}
if (points[cell] == point)
{
values[cell] = saturatingCastToInt((long) values[cell] + (long) delta);
return true;
}
return false;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = 0; i < points.length; i++)
{
if (points[i] == -1)
continue;
if (sb.length() > 1)
sb.append(", ");
sb.append('[').append(points[i]).append(',').append(values[i]).append(']');
}
sb.append(']');
return sb.toString();
}
}
private static int ceilKey(int point, int bucketSize)
{
int delta = point % bucketSize;
if (delta == 0)
return point;
return saturatingCastToMaxDeletionTime((long) point + (long) bucketSize - (long) delta);
}
public static int saturatingCastToInt(long value)
{
return (int) (value > Integer.MAX_VALUE ? Integer.MAX_VALUE : value);
}
/**
* Cast to an int with maximum value of {@code Cell.MAX_DELETION_TIME} to avoid representing values that
* aren't a tombstone
*/
public static int saturatingCastToMaxDeletionTime(long value)
{
return (value < 0L || value > Cell.MAX_DELETION_TIME)
? Cell.MAX_DELETION_TIME
: (int) value;
}
}