blob: a5ca158d9e29d82b1b088f4a3c374e0e93d24845 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A streaming grouper which can aggregate sorted inputs. This grouper can aggregate while its iterator is being
* consumed. The aggregation thread and the iterating thread can be different.
*
* This grouper is backed by an off-heap circular array. The reading thread is able to read data from an array slot
* only when aggregation for the grouping key correspoing to that slot is finished. Since the reading and writing
* threads cannot access the same array slot at the same time, they can read/write data without contention.
*
* This class uses the spinlock for waiting for at least one slot to become available when the array is empty or full.
* If the array is empty, the reading thread waits for the aggregation for an array slot is finished. If the array is
* full, the writing thread waits for the reading thread to read at least one aggregate from the array.
*/
public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger LOG = new Logger(StreamingMergeSortedGrouper.class);
private static final long DEFAULT_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5); // default timeout for spinlock
// Threashold time for spinlocks in increaseWriteIndex() and increaseReadIndex(). The waiting thread calls
// Thread.yield() after this threadhold time elapses.
private static final long SPIN_FOR_TIMEOUT_THRESHOLD_NS = 1000L;
private final Supplier<ByteBuffer> bufferSupplier;
private final KeySerde<KeyType> keySerde;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int keySize;
private final int recordSize; // size of (key + all aggregates)
// Timeout for the current query.
// The query must fail with a timeout exception if System.nanoTime() >= queryTimeoutAtNs. This is used in the
// spinlocks to prevent the writing thread from being blocked if the iterator of this grouper is not consumed due to
// some failures which potentially makes the whole system being paused.
private final long queryTimeoutAtNs;
private final boolean hasQueryTimeout;
// Below variables are initialized when init() is called.
private ByteBuffer buffer;
private int maxNumSlots;
private boolean initialized;
/**
* Indicate that this grouper consumed the last input or not. The writing thread must set this value to true by
* calling {@link #finish()} when it's done. This variable is always set by the writing thread and read by the
* reading thread.
*/
private volatile boolean finished;
/**
* Current write index of the array. This points to the array slot where the aggregation is currently performed. Its
* initial value is -1 which means any data are not written yet. Since it's assumed that the input is sorted by the
* grouping key, this variable is moved to the next slot whenever a new grouping key is found. Once it reaches the
* last slot of the array, it moves to the first slot.
*
* This is always moved ahead of {@link #nextReadIndex}. If the array is full, this variable
* cannot be moved until {@link #nextReadIndex} is moved. See {@link #increaseWriteIndex()} for more details. This
* variable is always incremented by the writing thread and read by both the writing and the reading threads.
*/
private volatile int curWriteIndex;
/**
* Next read index of the array. This points to the array slot which the reading thread will read next. Its initial
* value is -1 which means any data are not read yet. This variable can point an array slot only when the aggregation
* for that slot is finished. Once it reaches the last slot of the array, it moves to the first slot.
*
* This always follows {@link #curWriteIndex}. If the array is empty, this variable cannot be moved until the
* aggregation for at least one grouping key is finished which in turn {@link #curWriteIndex} is moved. See
* {@link #iterator()} for more details. This variable is always incremented by the reading thread and read by both
* the writing and the reading threads.
*/
private volatile int nextReadIndex;
/**
* Returns the minimum buffer capacity required for this grouper. This grouper keeps track read/write indexes
* and they cannot point the same array slot at the same time. Since the read/write indexes move circularly, one
* extra slot is needed in addition to the read/write slots. Finally, the required minimum buffer capacity is
* 3 * record size.
*
* @return required minimum buffer capacity
*/
public static <KeyType> int requiredBufferCapacity(
KeySerde<KeyType> keySerde,
AggregatorFactory[] aggregatorFactories
)
{
int recordSize = keySerde.keySize();
for (AggregatorFactory aggregatorFactory : aggregatorFactories) {
recordSize += aggregatorFactory.getMaxIntermediateSizeWithNulls();
}
return recordSize * 3;
}
StreamingMergeSortedGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final long queryTimeoutAtMs
)
{
this.bufferSupplier = bufferSupplier;
this.keySerde = keySerde;
this.aggregators = new BufferAggregator[aggregatorFactories.length];
this.aggregatorOffsets = new int[aggregatorFactories.length];
this.keySize = keySerde.keySize();
int offset = keySize;
for (int i = 0; i < aggregatorFactories.length; i++) {
aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory);
aggregatorOffsets[i] = offset;
offset += aggregatorFactories[i].getMaxIntermediateSizeWithNulls();
}
this.recordSize = offset;
// queryTimeoutAtMs comes from System.currentTimeMillis(), but we should use System.nanoTime() to check timeout in
// this class. See increaseWriteIndex() and increaseReadIndex().
this.hasQueryTimeout = queryTimeoutAtMs != QueryContexts.NO_TIMEOUT;
final long timeoutNs = hasQueryTimeout ?
TimeUnit.MILLISECONDS.toNanos(queryTimeoutAtMs - System.currentTimeMillis()) :
QueryContexts.NO_TIMEOUT;
this.queryTimeoutAtNs = System.nanoTime() + timeoutNs;
}
@Override
public void init()
{
if (!initialized) {
buffer = bufferSupplier.get();
maxNumSlots = buffer.capacity() / recordSize;
Preconditions.checkState(
maxNumSlots > 2,
"Buffer[%s] should be large enough to store at least three records[%s]",
buffer.capacity(),
recordSize
);
reset();
initialized = true;
}
}
@Override
public boolean isInitialized()
{
return initialized;
}
@Override
public AggregateResult aggregate(KeyType key, int notUsed)
{
return aggregate(key);
}
@Override
public AggregateResult aggregate(KeyType key)
{
try {
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
final int prevRecordOffset = curWriteIndex * recordSize;
if (curWriteIndex == -1 || !keyEquals(keyBuffer, buffer, prevRecordOffset)) {
// Initialize a new slot for the new key. This may be potentially blocked if the array is full until at least
// one slot becomes available.
initNewSlot(keyBuffer);
}
final int curRecordOffset = curWriteIndex * recordSize;
for (int i = 0; i < aggregatorOffsets.length; i++) {
aggregators[i].aggregate(buffer, curRecordOffset + aggregatorOffsets[i]);
}
return AggregateResult.ok();
}
catch (RuntimeException e) {
finished = true;
throw e;
}
}
/**
* Checks two keys contained in the given buffers are same.
*
* @param curKeyBuffer the buffer for the given key from {@link #aggregate(Object)}
* @param buffer the whole array buffer
* @param bufferOffset the key offset of the buffer
*
* @return true if the two buffers are same.
*/
private boolean keyEquals(ByteBuffer curKeyBuffer, ByteBuffer buffer, int bufferOffset)
{
// Since this method is frequently called per each input row, the compare performance matters.
int i = 0;
for (; i + Long.BYTES <= keySize; i += Long.BYTES) {
if (curKeyBuffer.getLong(i) != buffer.getLong(bufferOffset + i)) {
return false;
}
}
if (i + Integer.BYTES <= keySize) {
// This can be called at most once because we already compared using getLong() in the above.
if (curKeyBuffer.getInt(i) != buffer.getInt(bufferOffset + i)) {
return false;
}
i += Integer.BYTES;
}
for (; i < keySize; i++) {
if (curKeyBuffer.get(i) != buffer.get(bufferOffset + i)) {
return false;
}
}
return true;
}
/**
* Initialize a new slot for a new grouping key. This may be potentially blocked if the array is full until at least
* one slot becomes available.
*/
private void initNewSlot(ByteBuffer newKey)
{
// Wait if the array is full and increase curWriteIndex
increaseWriteIndex();
final int recordOffset = recordSize * curWriteIndex;
buffer.position(recordOffset);
buffer.put(newKey);
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(buffer, recordOffset + aggregatorOffsets[i]);
}
}
/**
* Wait for {@link #nextReadIndex} to be moved if necessary and move {@link #curWriteIndex}.
*/
private void increaseWriteIndex()
{
final long startAtNs = System.nanoTime();
final long queryTimeoutAtNs = getQueryTimeoutAtNs(startAtNs);
final long spinTimeoutAtNs = startAtNs + SPIN_FOR_TIMEOUT_THRESHOLD_NS;
long timeoutNs = queryTimeoutAtNs - startAtNs;
long spinTimeoutNs = SPIN_FOR_TIMEOUT_THRESHOLD_NS;
// In the below, we check that the array is full and wait for at least one slot to become available.
//
// nextReadIndex is a volatile variable and the changes on it are continuously checked until they are seen in
// the while loop. See the following links.
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-8.html#jls-8.3.1.4
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5
// * https://stackoverflow.com/questions/11761552/detailed-semantics-of-volatile-regarding-timeliness-of-visibility
if (curWriteIndex == maxNumSlots - 1) {
// We additionally check that nextReadIndex is -1 here because the writing thread should wait for the reading
// thread to start reading only when the writing thread tries to overwrite the first slot for the first time.
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on nextReadIndex happens-before changing curWriteIndex.
curWriteIndex = 0;
} else {
final int nextWriteIndex = curWriteIndex + 1;
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextWriteIndex == nextReadIndex) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on nextReadIndex happens-before changing curWriteIndex.
curWriteIndex = nextWriteIndex;
}
}
@Override
public void reset()
{
curWriteIndex = -1;
nextReadIndex = -1;
finished = false;
}
@Override
public void close()
{
for (BufferAggregator aggregator : aggregators) {
try {
aggregator.close();
}
catch (Exception e) {
LOG.warn(e, "Could not close aggregator [%s], skipping.", aggregator);
}
}
}
/**
* Signal that no more inputs are added. Must be called after {@link #aggregate(Object)} is called for the last input.
*/
public void finish()
{
increaseWriteIndex();
// Once finished is set, curWriteIndex must not be changed. This guarantees that the remaining number of items in
// the array is always decreased as the reading thread proceeds. See hasNext() and remaining() below.
finished = true;
}
/**
* Return a sorted iterator. This method can be called safely while writing, and the iterating thread and the writing
* thread can be different. The result iterator always returns sorted results. This method should be called only one
* time per grouper.
*
* @return a sorted iterator
*/
public CloseableIterator<Entry<KeyType>> iterator()
{
if (!initialized) {
throw new ISE("Grouper should be initialized first");
}
return new CloseableIterator<Entry<KeyType>>()
{
{
// Wait for some data to be ready and initialize nextReadIndex.
increaseReadIndexTo(0);
}
@Override
public boolean hasNext()
{
// If setting finished happens-before the below check, curWriteIndex isn't changed anymore and thus remainig()
// can be computed safely because nextReadIndex is changed only by the reading thread.
// Otherwise, hasNext() always returns true.
//
// The below line can be executed between increasing curWriteIndex and setting finished in
// StreamingMergeSortedGrouper.finish(), but it is also a valid case because there should be at least one slot
// which is not read yet before finished is set.
return !finished || remaining() > 0;
}
/**
* Calculate the number of remaining items in the array. Must be called only when
* {@link StreamingMergeSortedGrouper#finished} is true.
*
* @return the number of remaining items
*/
private int remaining()
{
if (curWriteIndex >= nextReadIndex) {
return curWriteIndex - nextReadIndex;
} else {
return (maxNumSlots - nextReadIndex) + curWriteIndex;
}
}
@Override
public Entry<KeyType> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
// Here, nextReadIndex should be valid which means:
// - a valid array index which should be >= 0 and < maxNumSlots
// - an index of the array slot where the aggregation for the corresponding grouping key is done
// - an index of the array slot which is not read yet
final int recordOffset = recordSize * nextReadIndex;
final KeyType key = keySerde.fromByteBuffer(buffer, recordOffset);
final Object[] values = new Object[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
values[i] = aggregators[i].get(buffer, recordOffset + aggregatorOffsets[i]);
}
final int targetIndex = nextReadIndex == maxNumSlots - 1 ? 0 : nextReadIndex + 1;
// Wait if the array is empty until at least one slot becomes available for read, and then increase
// nextReadIndex.
increaseReadIndexTo(targetIndex);
return new Entry<>(key, values);
}
/**
* Wait for {@link StreamingMergeSortedGrouper#curWriteIndex} to be moved if necessary and move
* {@link StreamingMergeSortedGrouper#nextReadIndex}.
*
* @param target the target index {@link StreamingMergeSortedGrouper#nextReadIndex} will move to
*/
private void increaseReadIndexTo(int target)
{
// Check that the array is empty and wait for at least one slot to become available.
//
// curWriteIndex is a volatile variable and the changes on it are continuously checked until they are seen in
// the while loop. See the following links.
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-8.html#jls-8.3.1.4
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5
// * https://stackoverflow.com/questions/11761552/detailed-semantics-of-volatile-regarding-timeliness-of-visibility
final long startAtNs = System.nanoTime();
final long queryTimeoutAtNs = getQueryTimeoutAtNs(startAtNs);
final long spinTimeoutAtNs = startAtNs + SPIN_FOR_TIMEOUT_THRESHOLD_NS;
long timeoutNs = queryTimeoutAtNs - startAtNs;
long spinTimeoutNs = SPIN_FOR_TIMEOUT_THRESHOLD_NS;
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((curWriteIndex == -1 || target == curWriteIndex) &&
!finished && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on curWriteIndex happens-before changing nextReadIndex.
nextReadIndex = target;
}
@Override
public void close()
{
// do nothing
}
};
}
private long getQueryTimeoutAtNs(long startAtNs)
{
return hasQueryTimeout ? queryTimeoutAtNs : startAtNs + DEFAULT_TIMEOUT_NS;
}
/**
* Return a sorted iterator. This method can be called safely while writing and iterating thread and writing thread
* can be different. The result iterator always returns sorted results. This method should be called only one time
* per grouper.
*
* @param sorted not used
*
* @return a sorted iterator
*/
@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
return iterator();
}
}