| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.utils; |
| |
| import java.io.Closeable; |
| import java.util.*; |
| |
| import org.apache.cassandra.utils.AbstractIterator; |
| |
| /** Merges sorted input iterators which individually contain unique items. */ |
| public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out> |
| { |
| protected final Reducer<In,Out> reducer; |
| protected final List<? extends Iterator<In>> iterators; |
| |
| protected MergeIterator(List<? extends Iterator<In>> iters, Reducer<In, Out> reducer) |
| { |
| this.iterators = iters; |
| this.reducer = reducer; |
| } |
| |
| public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources, |
| Comparator<? super In> comparator, |
| Reducer<In, Out> reducer) |
| { |
| if (sources.size() == 1) |
| { |
| return reducer.trivialReduceIsTrivial() |
| ? new TrivialOneToOne<>(sources, reducer) |
| : new OneToOne<>(sources, reducer); |
| } |
| return new ManyToOne<>(sources, comparator, reducer); |
| } |
| |
| public Iterable<? extends Iterator<In>> iterators() |
| { |
| return iterators; |
| } |
| |
| public void close() |
| { |
| for (Iterator<In> iterator : this.iterators) |
| { |
| try |
| { |
| if (iterator instanceof AutoCloseable) |
| ((AutoCloseable)iterator).close(); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| reducer.close(); |
| } |
| |
| /** |
| * A MergeIterator that consumes multiple input values per output value. |
| * |
| * The most straightforward way to implement this is to use a {@code PriorityQueue} of iterators, {@code poll} it to |
| * find the next item to consume, then {@code add} the iterator back after advancing. This is not very efficient as |
| * {@code poll} and {@code add} in all cases require at least {@code log(size)} comparisons (usually more than |
| * {@code 2*log(size)}) per consumed item, even if the input is suitable for fast iteration. |
| * |
| * The implementation below makes use of the fact that replacing the top element in a binary heap can be done much |
| * more efficiently than separately removing it and placing it back, especially in the cases where the top iterator |
| * is to be used again very soon (e.g. when there are large sections of the output where only a limited number of |
| * input iterators overlap, which is normally the case in many practically useful situations, e.g. levelled |
| * compaction). To further improve this particular scenario, we also use a short sorted section at the start of the |
| * queue. |
| * |
| * The heap is laid out as this (for {@code SORTED_SECTION_SIZE == 2}): |
| * 0 |
| * | |
| * 1 |
| * | |
| * 2 |
| * / \ |
| * 3 4 |
| * / \ / \ |
| * 5 6 7 8 |
| * .. .. .. .. |
| * Where each line is a <= relationship. |
| * |
| * In the sorted section we can advance with a single comparison per level, while advancing a level within the heap |
| * requires two (so that we can find the lighter element to pop up). |
| * The sorted section adds a constant overhead when data is uniformly distributed among the iterators, but may up |
| * to halve the iteration time when one iterator is dominant over sections of the merged data (as is the case with |
| * non-overlapping iterators). |
| * |
| * The iterator is further complicated by the need to avoid advancing the input iterators until an output is |
| * actually requested. To achieve this {@code consume} walks the heap to find equal items without advancing the |
| * iterators, and {@code advance} moves them and restores the heap structure before any items can be consumed. |
| * |
| * To avoid having to do additional comparisons in consume to identify the equal items, we keep track of equality |
| * between children and their parents in the heap. More precisely, the lines in the diagram above define the |
| * following relationship: |
| * parent <= child && (parent == child) == child.equalParent |
| * We can track, make use of and update the equalParent field without any additional comparisons. |
| * |
| * For more formal definitions and proof of correctness, see CASSANDRA-8915. |
| */ |
| static final class ManyToOne<In,Out> extends MergeIterator<In,Out> |
| { |
| protected final Candidate<In>[] heap; |
| |
| /** Number of non-exhausted iterators. */ |
| int size; |
| |
| /** |
| * Position of the deepest, right-most child that needs advancing before we can start consuming. |
| * Because advancing changes the values of the items of each iterator, the parent-chain from any position |
| * in this range that needs advancing is not in correct order. The trees rooted at any position that does |
| * not need advancing, however, retain their prior-held binary heap property. |
| */ |
| int needingAdvance; |
| |
| /** |
| * The number of elements to keep in order before the binary heap starts, exclusive of the top heap element. |
| */ |
| static final int SORTED_SECTION_SIZE = 4; |
| |
| public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer) |
| { |
| super(iters, reducer); |
| |
| @SuppressWarnings("unchecked") |
| Candidate<In>[] heap = new Candidate[iters.size()]; |
| this.heap = heap; |
| size = 0; |
| |
| for (int i = 0; i < iters.size(); i++) |
| { |
| Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp); |
| heap[size++] = candidate; |
| } |
| needingAdvance = size; |
| } |
| |
| protected final Out computeNext() |
| { |
| advance(); |
| return consume(); |
| } |
| |
| /** |
| * Advance all iterators that need to be advanced and place them into suitable positions in the heap. |
| * |
| * By walking the iterators backwards we know that everything after the point being processed already forms |
| * correctly ordered subheaps, thus we can build a subheap rooted at the current position by only sinking down |
| * the newly advanced iterator. Because all parents of a consumed iterator are also consumed there is no way |
| * that we can process one consumed iterator but skip over its parent. |
| * |
| * The procedure is the same as the one used for the initial building of a heap in the heapsort algorithm and |
| * has a maximum number of comparisons {@code (2 * log(size) + SORTED_SECTION_SIZE / 2)} multiplied by the |
| * number of iterators whose items were consumed at the previous step, but is also at most linear in the size of |
| * the heap if the number of consumed elements is high (as it is in the initial heap construction). With non- or |
| * lightly-overlapping iterators the procedure finishes after just one (resp. a couple of) comparisons. |
| */ |
| private void advance() |
| { |
| // Turn the set of candidates into a heap. |
| for (int i = needingAdvance - 1; i >= 0; --i) |
| { |
| Candidate<In> candidate = heap[i]; |
| /** |
| * needingAdvance runs to the maximum index (and deepest-right node) that may need advancing; |
| * since the equal items that were consumed at-once may occur in sub-heap "veins" of equality, |
| * not all items above this deepest-right position may have been consumed; these already form |
| * valid sub-heaps and can be skipped-over entirely |
| */ |
| if (candidate.needsAdvance()) |
| replaceAndSink(candidate.advance(), i); |
| } |
| } |
| |
| /** |
| * Consume all items that sort like the current top of the heap. As we cannot advance the iterators to let |
| * equivalent items pop up, we walk the heap to find them and mark them as needing advance. |
| * |
| * This relies on the equalParent flag to avoid doing any comparisons. |
| */ |
| private Out consume() |
| { |
| if (size == 0) |
| return endOfData(); |
| |
| reducer.onKeyChange(); |
| assert !heap[0].equalParent; |
| reducer.reduce(heap[0].idx, heap[0].consume()); |
| final int size = this.size; |
| final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE); |
| int i; |
| consume: { |
| for (i = 1; i < sortedSectionSize; ++i) |
| { |
| if (!heap[i].equalParent) |
| break consume; |
| reducer.reduce(heap[i].idx, heap[i].consume()); |
| } |
| i = Math.max(i, consumeHeap(i) + 1); |
| } |
| needingAdvance = i; |
| return reducer.getReduced(); |
| } |
| |
| /** |
| * Recursively consume all items equal to equalItem in the binary subheap rooted at position idx. |
| * |
| * @return the largest equal index found in this search. |
| */ |
| private int consumeHeap(int idx) |
| { |
| if (idx >= size || !heap[idx].equalParent) |
| return -1; |
| |
| reducer.reduce(heap[idx].idx, heap[idx].consume()); |
| int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1); |
| return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1))); |
| } |
| |
| /** |
| * Replace an iterator in the heap with the given position and move it down the heap until it finds its proper |
| * position, pulling lighter elements up the heap. |
| * |
| * Whenever an equality is found between two elements that form a new parent-child relationship, the child's |
| * equalParent flag is set to true if the elements are equal. |
| */ |
| private void replaceAndSink(Candidate<In> candidate, int currIdx) |
| { |
| if (candidate == null) |
| { |
| // Drop iterator by replacing it with the last one in the heap. |
| candidate = heap[--size]; |
| heap[size] = null; // not necessary but helpful for debugging |
| } |
| // The new element will be top of its heap, at this point there is no parent to be equal to. |
| candidate.equalParent = false; |
| |
| final int size = this.size; |
| final int sortedSectionSize = Math.min(size - 1, SORTED_SECTION_SIZE); |
| |
| int nextIdx; |
| |
| // Advance within the sorted section, pulling up items lighter than candidate. |
| while ((nextIdx = currIdx + 1) <= sortedSectionSize) |
| { |
| if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child |
| { |
| int cmp = candidate.compareTo(heap[nextIdx]); |
| if (cmp <= 0) |
| { |
| heap[nextIdx].equalParent = cmp == 0; |
| heap[currIdx] = candidate; |
| return; |
| } |
| } |
| |
| heap[currIdx] = heap[nextIdx]; |
| currIdx = nextIdx; |
| } |
| // If size <= SORTED_SECTION_SIZE, nextIdx below will be no less than size, |
| // because currIdx == sortedSectionSize == size - 1 and nextIdx becomes |
| // (size - 1) * 2) - (size - 1 - 1) == size. |
| |
| // Advance in the binary heap, pulling up the lighter element from the two at each level. |
| while ((nextIdx = (currIdx * 2) - (sortedSectionSize - 1)) + 1 < size) |
| { |
| if (!heap[nextIdx].equalParent) |
| { |
| if (!heap[nextIdx + 1].equalParent) |
| { |
| // pick the smallest of the two children |
| int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]); |
| if (siblingCmp < 0) |
| ++nextIdx; |
| |
| // if we're smaller than this, we are done, and must only restore the heap and equalParent properties |
| int cmp = candidate.compareTo(heap[nextIdx]); |
| if (cmp <= 0) |
| { |
| if (cmp == 0) |
| { |
| heap[nextIdx].equalParent = true; |
| if (siblingCmp == 0) // siblingCmp == 0 => nextIdx is the left child |
| heap[nextIdx + 1].equalParent = true; |
| } |
| |
| heap[currIdx] = candidate; |
| return; |
| } |
| |
| if (siblingCmp == 0) |
| { |
| // siblingCmp == 0 => nextIdx is still the left child |
| // if the two siblings were equal, and we are inserting something greater, we will |
| // pull up the left one; this means the right gets an equalParent |
| heap[nextIdx + 1].equalParent = true; |
| } |
| } |
| else |
| ++nextIdx; // descend down the path where we found the equal child |
| } |
| |
| heap[currIdx] = heap[nextIdx]; |
| currIdx = nextIdx; |
| } |
| |
| // our loop guard ensures there are always two siblings to process; typically when we exit the loop we will |
| // be well past the end of the heap and this next condition will match... |
| if (nextIdx >= size) |
| { |
| heap[currIdx] = candidate; |
| return; |
| } |
| |
| // ... but sometimes we will have one last child to compare against, that has no siblings |
| if (!heap[nextIdx].equalParent) |
| { |
| int cmp = candidate.compareTo(heap[nextIdx]); |
| if (cmp <= 0) |
| { |
| heap[nextIdx].equalParent = cmp == 0; |
| heap[currIdx] = candidate; |
| return; |
| } |
| } |
| |
| heap[currIdx] = heap[nextIdx]; |
| heap[nextIdx] = candidate; |
| } |
| } |
| |
| // Holds and is comparable by the head item of an iterator it owns |
| protected static final class Candidate<In> implements Comparable<Candidate<In>> |
| { |
| private final Iterator<? extends In> iter; |
| private final Comparator<? super In> comp; |
| private final int idx; |
| private In item; |
| boolean equalParent; |
| |
| public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp) |
| { |
| this.iter = iter; |
| this.comp = comp; |
| this.idx = idx; |
| } |
| |
| /** @return this if our iterator had an item, and it is now available, otherwise null */ |
| protected Candidate<In> advance() |
| { |
| if (!iter.hasNext()) |
| return null; |
| item = iter.next(); |
| return this; |
| } |
| |
| public int compareTo(Candidate<In> that) |
| { |
| assert item != null && that.item != null; |
| return comp.compare(this.item, that.item); |
| } |
| |
| public In consume() |
| { |
| In temp = item; |
| item = null; |
| assert temp != null; |
| return temp; |
| } |
| |
| public boolean needsAdvance() |
| { |
| return item == null; |
| } |
| } |
| |
| /** Accumulator that collects values of type A, and outputs a value of type B. */ |
| public static abstract class Reducer<In,Out> |
| { |
| /** |
| * @return true if Out is the same as In for the case of a single source iterator |
| */ |
| public boolean trivialReduceIsTrivial() |
| { |
| return false; |
| } |
| |
| /** |
| * combine this object with the previous ones. |
| * intermediate state is up to your implementation. |
| */ |
| public abstract void reduce(int idx, In current); |
| |
| /** @return The last object computed by reduce */ |
| protected abstract Out getReduced(); |
| |
| /** |
| * Called at the beginning of each new key, before any reduce is called. |
| * To be overridden by implementing classes. |
| */ |
| protected void onKeyChange() {} |
| |
| /** |
| * May be overridden by implementations that require cleaning up after use |
| */ |
| public void close() {} |
| } |
| |
| private static class OneToOne<In, Out> extends MergeIterator<In, Out> |
| { |
| private final Iterator<In> source; |
| |
| public OneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer) |
| { |
| super(sources, reducer); |
| source = sources.get(0); |
| } |
| |
| protected Out computeNext() |
| { |
| if (!source.hasNext()) |
| return endOfData(); |
| reducer.onKeyChange(); |
| reducer.reduce(0, source.next()); |
| return reducer.getReduced(); |
| } |
| } |
| |
| private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out> |
| { |
| private final Iterator<In> source; |
| |
| public TrivialOneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer) |
| { |
| super(sources, reducer); |
| source = sources.get(0); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected Out computeNext() |
| { |
| if (!source.hasNext()) |
| return endOfData(); |
| return (Out) source.next(); |
| } |
| } |
| } |