blob: 0cc530652ef112a434d625ffb803a132458c9b4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.io.Closeable;
import java.util.*;
import org.apache.cassandra.utils.AbstractIterator;
/** Merges sorted input iterators which individually contain unique items. */
public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out>
{
protected final Reducer<In,Out> reducer;
protected final List<? extends Iterator<In>> iterators;
protected MergeIterator(List<? extends Iterator<In>> iters, Reducer<In, Out> reducer)
{
this.iterators = iters;
this.reducer = reducer;
}
public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources,
Comparator<? super In> comparator,
Reducer<In, Out> reducer)
{
if (sources.size() == 1)
{
return reducer.trivialReduceIsTrivial()
? new TrivialOneToOne<>(sources, reducer)
: new OneToOne<>(sources, reducer);
}
return new ManyToOne<>(sources, comparator, reducer);
}
public Iterable<? extends Iterator<In>> iterators()
{
return iterators;
}
public void close()
{
for (Iterator<In> iterator : this.iterators)
{
try
{
if (iterator instanceof AutoCloseable)
((AutoCloseable)iterator).close();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
reducer.close();
}
/**
* A MergeIterator that consumes multiple input values per output value.
*
* The most straightforward way to implement this is to use a {@code PriorityQueue} of iterators, {@code poll} it to
* find the next item to consume, then {@code add} the iterator back after advancing. This is not very efficient as
* {@code poll} and {@code add} in all cases require at least {@code log(size)} comparisons (usually more than
* {@code 2*log(size)}) per consumed item, even if the input is suitable for fast iteration.
*
* The implementation below makes use of the fact that replacing the top element in a binary heap can be done much
* more efficiently than separately removing it and placing it back, especially in the cases where the top iterator
* is to be used again very soon (e.g. when there are large sections of the output where only a limited number of
* input iterators overlap, which is normally the case in many practically useful situations, e.g. levelled
* compaction). To further improve this particular scenario, we also use a short sorted section at the start of the
* queue.
*
* The heap is laid out as this (for {@code SORTED_SECTION_SIZE == 2}):
* 0
* |
* 1
* |
* 2
* / \
* 3 4
* / \ / \
* 5 6 7 8
* .. .. .. ..
* Where each line is a <= relationship.
*
* In the sorted section we can advance with a single comparison per level, while advancing a level within the heap
* requires two (so that we can find the lighter element to pop up).
* The sorted section adds a constant overhead when data is uniformly distributed among the iterators, but may up
* to halve the iteration time when one iterator is dominant over sections of the merged data (as is the case with
* non-overlapping iterators).
*
* The iterator is further complicated by the need to avoid advancing the input iterators until an output is
* actually requested. To achieve this {@code consume} walks the heap to find equal items without advancing the
* iterators, and {@code advance} moves them and restores the heap structure before any items can be consumed.
*
* To avoid having to do additional comparisons in consume to identify the equal items, we keep track of equality
* between children and their parents in the heap. More precisely, the lines in the diagram above define the
* following relationship:
* parent <= child && (parent == child) == child.equalParent
* We can track, make use of and update the equalParent field without any additional comparisons.
*
* For more formal definitions and proof of correctness, see CASSANDRA-8915.
*/
static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
{
protected final Candidate<In>[] heap;
/** Number of non-exhausted iterators. */
int size;
/**
* Position of the deepest, right-most child that needs advancing before we can start consuming.
* Because advancing changes the values of the items of each iterator, the parent-chain from any position
* in this range that needs advancing is not in correct order. The trees rooted at any position that does
* not need advancing, however, retain their prior-held binary heap property.
*/
int needingAdvance;
/**
* The number of elements to keep in order before the binary heap starts, exclusive of the top heap element.
*/
static final int SORTED_SECTION_SIZE = 4;
public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer)
{
super(iters, reducer);
@SuppressWarnings("unchecked")
Candidate<In>[] heap = new Candidate[iters.size()];
this.heap = heap;
size = 0;
for (int i = 0; i < iters.size(); i++)
{
Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
heap[size++] = candidate;
}
needingAdvance = size;
}
protected final Out computeNext()
{
advance();
return consume();
}
/**
* Advance all iterators that need to be advanced and place them into suitable positions in the heap.
*
* By walking the iterators backwards we know that everything after the point being processed already forms
* correctly ordered subheaps, thus we can build a subheap rooted at the current position by only sinking down
* the newly advanced iterator. Because all parents of a consumed iterator are also consumed there is no way
* that we can process one consumed iterator but skip over its parent.
*
* The procedure is the same as the one used for the initial building of a heap in the heapsort algorithm and
* has a maximum number of comparisons {@code (2 * log(size) + SORTED_SECTION_SIZE / 2)} multiplied by the
* number of iterators whose items were consumed at the previous step, but is also at most linear in the size of
* the heap if the number of consumed elements is high (as it is in the initial heap construction). With non- or
* lightly-overlapping iterators the procedure finishes after just one (resp. a couple of) comparisons.
*/
private void advance()
{
// Turn the set of candidates into a heap.
for (int i = needingAdvance - 1; i >= 0; --i)
{
Candidate<In> candidate = heap[i];
/**
* needingAdvance runs to the maximum index (and deepest-right node) that may need advancing;
* since the equal items that were consumed at-once may occur in sub-heap "veins" of equality,
* not all items above this deepest-right position may have been consumed; these already form
* valid sub-heaps and can be skipped-over entirely
*/
if (candidate.needsAdvance())
replaceAndSink(candidate.advance(), i);
}
}
/**
* Consume all items that sort like the current top of the heap. As we cannot advance the iterators to let
* equivalent items pop up, we walk the heap to find them and mark them as needing advance.
*
* This relies on the equalParent flag to avoid doing any comparisons.
*/
private Out consume()
{
if (size == 0)
return endOfData();
reducer.onKeyChange();
assert !heap[0].equalParent;
reducer.reduce(heap[0].idx, heap[0].consume());
final int size = this.size;
final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE);
int i;
consume: {
for (i = 1; i < sortedSectionSize; ++i)
{
if (!heap[i].equalParent)
break consume;
reducer.reduce(heap[i].idx, heap[i].consume());
}
i = Math.max(i, consumeHeap(i) + 1);
}
needingAdvance = i;
return reducer.getReduced();
}
/**
* Recursively consume all items equal to equalItem in the binary subheap rooted at position idx.
*
* @return the largest equal index found in this search.
*/
private int consumeHeap(int idx)
{
if (idx >= size || !heap[idx].equalParent)
return -1;
reducer.reduce(heap[idx].idx, heap[idx].consume());
int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1);
return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1)));
}
/**
* Replace an iterator in the heap with the given position and move it down the heap until it finds its proper
* position, pulling lighter elements up the heap.
*
* Whenever an equality is found between two elements that form a new parent-child relationship, the child's
* equalParent flag is set to true if the elements are equal.
*/
private void replaceAndSink(Candidate<In> candidate, int currIdx)
{
if (candidate == null)
{
// Drop iterator by replacing it with the last one in the heap.
candidate = heap[--size];
heap[size] = null; // not necessary but helpful for debugging
}
// The new element will be top of its heap, at this point there is no parent to be equal to.
candidate.equalParent = false;
final int size = this.size;
final int sortedSectionSize = Math.min(size - 1, SORTED_SECTION_SIZE);
int nextIdx;
// Advance within the sorted section, pulling up items lighter than candidate.
while ((nextIdx = currIdx + 1) <= sortedSectionSize)
{
if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child
{
int cmp = candidate.compareTo(heap[nextIdx]);
if (cmp <= 0)
{
heap[nextIdx].equalParent = cmp == 0;
heap[currIdx] = candidate;
return;
}
}
heap[currIdx] = heap[nextIdx];
currIdx = nextIdx;
}
// If size <= SORTED_SECTION_SIZE, nextIdx below will be no less than size,
// because currIdx == sortedSectionSize == size - 1 and nextIdx becomes
// (size - 1) * 2) - (size - 1 - 1) == size.
// Advance in the binary heap, pulling up the lighter element from the two at each level.
while ((nextIdx = (currIdx * 2) - (sortedSectionSize - 1)) + 1 < size)
{
if (!heap[nextIdx].equalParent)
{
if (!heap[nextIdx + 1].equalParent)
{
// pick the smallest of the two children
int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]);
if (siblingCmp < 0)
++nextIdx;
// if we're smaller than this, we are done, and must only restore the heap and equalParent properties
int cmp = candidate.compareTo(heap[nextIdx]);
if (cmp <= 0)
{
if (cmp == 0)
{
heap[nextIdx].equalParent = true;
if (siblingCmp == 0) // siblingCmp == 0 => nextIdx is the left child
heap[nextIdx + 1].equalParent = true;
}
heap[currIdx] = candidate;
return;
}
if (siblingCmp == 0)
{
// siblingCmp == 0 => nextIdx is still the left child
// if the two siblings were equal, and we are inserting something greater, we will
// pull up the left one; this means the right gets an equalParent
heap[nextIdx + 1].equalParent = true;
}
}
else
++nextIdx; // descend down the path where we found the equal child
}
heap[currIdx] = heap[nextIdx];
currIdx = nextIdx;
}
// our loop guard ensures there are always two siblings to process; typically when we exit the loop we will
// be well past the end of the heap and this next condition will match...
if (nextIdx >= size)
{
heap[currIdx] = candidate;
return;
}
// ... but sometimes we will have one last child to compare against, that has no siblings
if (!heap[nextIdx].equalParent)
{
int cmp = candidate.compareTo(heap[nextIdx]);
if (cmp <= 0)
{
heap[nextIdx].equalParent = cmp == 0;
heap[currIdx] = candidate;
return;
}
}
heap[currIdx] = heap[nextIdx];
heap[nextIdx] = candidate;
}
}
// Holds and is comparable by the head item of an iterator it owns
protected static final class Candidate<In> implements Comparable<Candidate<In>>
{
private final Iterator<? extends In> iter;
private final Comparator<? super In> comp;
private final int idx;
private In item;
boolean equalParent;
public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
{
this.iter = iter;
this.comp = comp;
this.idx = idx;
}
/** @return this if our iterator had an item, and it is now available, otherwise null */
protected Candidate<In> advance()
{
if (!iter.hasNext())
return null;
item = iter.next();
return this;
}
public int compareTo(Candidate<In> that)
{
assert item != null && that.item != null;
return comp.compare(this.item, that.item);
}
public In consume()
{
In temp = item;
item = null;
assert temp != null;
return temp;
}
public boolean needsAdvance()
{
return item == null;
}
}
/** Accumulator that collects values of type A, and outputs a value of type B. */
public static abstract class Reducer<In,Out>
{
/**
* @return true if Out is the same as In for the case of a single source iterator
*/
public boolean trivialReduceIsTrivial()
{
return false;
}
/**
* combine this object with the previous ones.
* intermediate state is up to your implementation.
*/
public abstract void reduce(int idx, In current);
/** @return The last object computed by reduce */
protected abstract Out getReduced();
/**
* Called at the beginning of each new key, before any reduce is called.
* To be overridden by implementing classes.
*/
protected void onKeyChange() {}
/**
* May be overridden by implementations that require cleaning up after use
*/
public void close() {}
}
private static class OneToOne<In, Out> extends MergeIterator<In, Out>
{
private final Iterator<In> source;
public OneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer)
{
super(sources, reducer);
source = sources.get(0);
}
protected Out computeNext()
{
if (!source.hasNext())
return endOfData();
reducer.onKeyChange();
reducer.reduce(0, source.next());
return reducer.getReduced();
}
}
private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out>
{
private final Iterator<In> source;
public TrivialOneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer)
{
super(sources, reducer);
source = sources.get(0);
}
@SuppressWarnings("unchecked")
protected Out computeNext()
{
if (!source.hasNext())
return endOfData();
return (Out) source.next();
}
}
}