blob: 5120ece6a956e93fb373e96c328b11fe38116f01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;
/**
* RowCombiningTimeAndDimsIterator takes some {@link RowIterator}s, assuming that they are "sorted" (see javadoc of
* {@link MergingRowIterator} for the definition), merges the points as {@link MergingRowIterator}, and combines
* all "equal" points (points which have the same time and dimension values) into one, using the provided metric
* aggregator factories.
*/
final class RowCombiningTimeAndDimsIterator implements TimeAndDimsIterator
{
private static final int MIN_CURRENTLY_COMBINED_ROW_NUM_UNSET_VALUE = -1;
private final MergingRowIterator mergingIterator;
/**
* Those pointers are used as {@link #currentTimeAndDimsPointer} (and therefore returned from {@link #getPointer()}),
* until there is just one point in the "equivalence class" to be combined. It's an optimization: alternative was to
* start to "combine" right away for each point, and thus use only {@link
* #combinedTimeAndDimsPointersByOriginalIteratorIndex}. This optimization aims to reduce data movements from pointers
* of original iterators to {@link #combinedTimeAndDimsPointersByOriginalIteratorIndex} on each iteration.
*
* @see #soleCurrentPointSourceOriginalIteratorIndex
*/
private final TimeAndDimsPointer[] markedRowPointersOfOriginalIterators;
private final AggregateCombiner[] combinedMetricSelectors;
private final List<String> combinedMetricNames;
/**
* We preserve as many "combined time and dims pointers" as there were original iterators. Each of them is a composite
* of time and dimension selectors from the original iterator by the corresponding index, and the same metric
* selectors ({@link #combinedMetricSelectors}). It allows to be allocation-free during iteration, and also to reduce
* the number of field writes during iteration.
*/
private final TimeAndDimsPointer[] combinedTimeAndDimsPointersByOriginalIteratorIndex;
/**
* This bitset has set bits that correspond to the indexes of the original iterators, that participate the current
* combination of all points from the current "equivalence class", resulting in the current {@link #getPointer()}
* point.
*
* If there are less than 64 iterators combined, this field could be optimized to be just a single primitive long.
* This optimization could be done in the future.
*/
private final BitSet indexesOfCurrentlyCombinedOriginalIterators = new BitSet();
/**
* This field and {@link #maxCurrentlyCombinedRowNumByOriginalIteratorIndex} designate "row num range" in each
* original iterator, points from which are currently combined (see {@link
* #indexesOfCurrentlyCombinedOriginalIterators}). It could have been a single row number, if original iterators were
* guaranteed to have no duplicate rows themselves, but they are not.
*/
private final int[] minCurrentlyCombinedRowNumByOriginalIteratorIndex;
private final int[] maxCurrentlyCombinedRowNumByOriginalIteratorIndex;
@Nullable
private TimeAndDimsPointer currentTimeAndDimsPointer;
/**
* If soleCurrentPointSourceOriginalIteratorIndex >= 0, it means that no combines are done yet at the current point,
* {@link #currentTimeAndDimsPointer} is one of {@link #markedRowPointersOfOriginalIterators}, and the value of this
* field is the index of the original iterator which is the source of the sole (uncombined) point.
*
* If the value of this field is less than 0, it means that some combines are done at the current point, and {@link
* #currentTimeAndDimsPointer} is one of {@link #combinedTimeAndDimsPointersByOriginalIteratorIndex}.
*/
private int soleCurrentPointSourceOriginalIteratorIndex;
@Nullable
private RowPointer nextRowPointer;
RowCombiningTimeAndDimsIterator(
List<TransformableRowIterator> originalIterators,
AggregatorFactory[] metricAggs,
List<String> metricNames
)
{
int numCombinedIterators = originalIterators.size();
mergingIterator = new MergingRowIterator(originalIterators);
markedRowPointersOfOriginalIterators = new TimeAndDimsPointer[numCombinedIterators];
Arrays.setAll(
markedRowPointersOfOriginalIterators,
originalIteratorIndex -> {
TransformableRowIterator originalIterator = mergingIterator.getOriginalIterator(originalIteratorIndex);
return originalIterator != null ? originalIterator.getMarkedPointer() : null;
}
);
combinedMetricSelectors = new AggregateCombiner[metricAggs.length];
Arrays.setAll(combinedMetricSelectors, metricIndex -> metricAggs[metricIndex].makeNullableAggregateCombiner());
combinedMetricNames = metricNames;
combinedTimeAndDimsPointersByOriginalIteratorIndex = new TimeAndDimsPointer[numCombinedIterators];
Arrays.setAll(
combinedTimeAndDimsPointersByOriginalIteratorIndex,
originalIteratorIndex -> {
TimeAndDimsPointer markedRowPointer = markedRowPointersOfOriginalIterators[originalIteratorIndex];
if (markedRowPointer != null) {
return new TimeAndDimsPointer(
markedRowPointer.timestampSelector,
markedRowPointer.dimensionSelectors,
markedRowPointer.getDimensionHandlers(),
combinedMetricSelectors,
combinedMetricNames
);
} else {
return null;
}
}
);
minCurrentlyCombinedRowNumByOriginalIteratorIndex = new int[numCombinedIterators];
Arrays.fill(minCurrentlyCombinedRowNumByOriginalIteratorIndex, MIN_CURRENTLY_COMBINED_ROW_NUM_UNSET_VALUE);
maxCurrentlyCombinedRowNumByOriginalIteratorIndex = new int[numCombinedIterators];
if (mergingIterator.moveToNext()) {
nextRowPointer = mergingIterator.getPointer();
}
}
/**
* Clear the info about which rows (in which original iterators and which row nums within them) were combined on
* the previous step.
*/
private void clearCombinedRowsInfo()
{
for (int originalIteratorIndex = indexesOfCurrentlyCombinedOriginalIterators.nextSetBit(0);
originalIteratorIndex >= 0;
originalIteratorIndex = indexesOfCurrentlyCombinedOriginalIterators.nextSetBit(originalIteratorIndex + 1)) {
minCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] =
MIN_CURRENTLY_COMBINED_ROW_NUM_UNSET_VALUE;
}
indexesOfCurrentlyCombinedOriginalIterators.clear();
}
/**
* Warning: this method and {@link #startNewTimeAndDims} have just ~25 lines of code, but their logic is very
* convoluted and hard to understand. It could be especially confusing to try to understand it via debug.
*/
@Override
public boolean moveToNext()
{
clearCombinedRowsInfo();
if (nextRowPointer == null) {
currentTimeAndDimsPointer = null;
return false;
}
// This line implicitly uses the property of RowIterator.getPointer() (see [*] below), that it's still valid after
// RowPointer.moveToNext() returns false. mergingIterator.moveToNext() could have returned false during the previous
// call to this method, RowCombiningTimeAndDimsIterator.moveToNext().
startNewTimeAndDims(nextRowPointer);
nextRowPointer = null;
// [1] -- see comment in startNewTimeAndDims()
mergingIterator.mark();
// [2] -- see comment in startNewTimeAndDims()
while (mergingIterator.moveToNext()) {
if (mergingIterator.hasTimeAndDimsChangedSinceMark()) {
nextRowPointer = mergingIterator.getPointer(); // [*]
return true;
} else {
combineToCurrentTimeAndDims(mergingIterator.getPointer());
}
}
// No more rows left in mergingIterator
nextRowPointer = null;
return true;
}
/**
* This method doesn't assign one of {@link #combinedTimeAndDimsPointersByOriginalIteratorIndex} into {@link
* #currentTimeAndDimsPointer}, instead it uses one of {@link #markedRowPointersOfOriginalIterators}, see the javadoc
* of this field for explanation.
*/
private void startNewTimeAndDims(RowPointer rowPointer)
{
int originalIteratorIndex = rowPointer.getIndexNum();
// Note: at the moment when this operation is performed, markedRowPointersOfOriginalIterators[originalIteratorIndex]
// doesn't yet contain actual current dimension and metric values. startNewTimeAndDims() is called from
// moveToNext(), see above. Later in the code of moveToNext(), mergingIterator.mark() [1] is called, and then
// mergingIterator.moveToNext() [2]. This will make MergingRowIterator.moveToNext() implementation (see it's code)
// to call mark() on the current head iteratator, and only after that
// markedRowPointersOfOriginalIterators[originalIteratorIndex] will have correct values. So by the time when
// moveToNext() (from where this method is called) exits, and before getPointer() could be called by the user of
// this class, it will have correct values.
currentTimeAndDimsPointer = markedRowPointersOfOriginalIterators[originalIteratorIndex];
soleCurrentPointSourceOriginalIteratorIndex = originalIteratorIndex;
indexesOfCurrentlyCombinedOriginalIterators.set(originalIteratorIndex);
int rowNum = rowPointer.getRowNum();
minCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] = rowNum;
maxCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] = rowNum;
}
private void combineToCurrentTimeAndDims(RowPointer rowPointer)
{
int soleCurrentPointSourceOriginalIteratorIndex = this.soleCurrentPointSourceOriginalIteratorIndex;
if (soleCurrentPointSourceOriginalIteratorIndex >= 0) {
TimeAndDimsPointer currentRowPointer = this.currentTimeAndDimsPointer;
assert currentRowPointer != null;
resetCombinedMetrics(currentRowPointer);
currentTimeAndDimsPointer =
combinedTimeAndDimsPointersByOriginalIteratorIndex[soleCurrentPointSourceOriginalIteratorIndex];
this.soleCurrentPointSourceOriginalIteratorIndex = -1;
}
int originalIteratorIndex = rowPointer.getIndexNum();
indexesOfCurrentlyCombinedOriginalIterators.set(originalIteratorIndex);
int rowNum = rowPointer.getRowNum();
if (minCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] < 0) {
minCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] = rowNum;
}
maxCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex] = rowNum;
foldMetrics(rowPointer);
}
private void resetCombinedMetrics(TimeAndDimsPointer currentRowPointer)
{
for (int metricIndex = 0; metricIndex < combinedMetricSelectors.length; metricIndex++) {
combinedMetricSelectors[metricIndex].reset(currentRowPointer.getMetricSelector(metricIndex));
}
}
private void foldMetrics(RowPointer rowPointer)
{
for (int metricIndex = 0; metricIndex < combinedMetricSelectors.length; metricIndex++) {
combinedMetricSelectors[metricIndex].fold(rowPointer.getMetricSelector(metricIndex));
}
}
@Override
public TimeAndDimsPointer getPointer()
{
return Objects.requireNonNull(currentTimeAndDimsPointer);
}
/**
* Gets the next index of iterators (as provided in the List in constructor of RowCombiningTimeAndDimsIterator),
* that was the source of one or more points, that are combined to produce the current {@link #getPointer()} point.
*
* Should be used a-la {@link BitSet} iteration:
* for (int originalIteratorIndex = nextCurrentlyCombinedOriginalIteratorIndex(0);
* originalIteratorIndex >= 0;
* originalIteratorIndex = nextCurrentlyCombinedOriginalIteratorIndex(originalIteratorIndex + 1)) {
* ...
* }
*/
int nextCurrentlyCombinedOriginalIteratorIndex(int fromIndex)
{
return indexesOfCurrentlyCombinedOriginalIterators.nextSetBit(fromIndex);
}
/**
* See Javadoc of {@link #minCurrentlyCombinedRowNumByOriginalIteratorIndex} for explanation.
*/
int getMinCurrentlyCombinedRowNumByOriginalIteratorIndex(int originalIteratorIndex)
{
return minCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex];
}
/**
* See Javadoc of {@link #minCurrentlyCombinedRowNumByOriginalIteratorIndex} for explanation.
*/
int getMaxCurrentlyCombinedRowNumByOriginalIteratorIndex(int originalIteratorIndex)
{
return maxCurrentlyCombinedRowNumByOriginalIteratorIndex[originalIteratorIndex];
}
@Override
public void close()
{
mergingIterator.close();
}
}