blob: 599de071dd7a852f18a41a64c3cb011097661dfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sasi.utils;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.cassandra.io.util.FileUtils;
/**
* Range Union Iterator is used to return sorted stream of elements from multiple RangeIterator instances.
*
* PriorityQueue is used as a sorting mechanism for the ranges, where each computeNext() operation would poll
* from the queue (and push when done), which returns range that contains the smallest element, because
* sorting is done on the moving window of range iteration {@link RangeIterator#getCurrent()}. Once retrieved
* the smallest element (return candidate) is attempted to be merged with other ranges, because there could
* be equal elements in adjacent ranges, such ranges are poll'ed only if their {@link RangeIterator#getCurrent()}
* equals to the return candidate.
*
* @param <K> The type used to sort ranges.
* @param <D> The container type which is going to be returned by {@link Iterator#next()}.
*/
@SuppressWarnings("resource")
public class RangeUnionIterator<K extends Comparable<K>, D extends CombinedValue<K>> extends RangeIterator<K, D>
{
private final PriorityQueue<RangeIterator<K, D>> ranges;
private RangeUnionIterator(Builder.Statistics<K, D> statistics, PriorityQueue<RangeIterator<K, D>> ranges)
{
super(statistics);
this.ranges = ranges;
}
public D computeNext()
{
RangeIterator<K, D> head = null;
while (!ranges.isEmpty())
{
head = ranges.poll();
if (head.hasNext())
break;
FileUtils.closeQuietly(head);
}
if (head == null || !head.hasNext())
return endOfData();
D candidate = head.next();
List<RangeIterator<K, D>> processedRanges = new ArrayList<>();
if (head.hasNext())
processedRanges.add(head);
else
FileUtils.closeQuietly(head);
while (!ranges.isEmpty())
{
// peek here instead of poll is an optimization
// so we can re-insert less ranges back if candidate
// is less than head of the current range.
RangeIterator<K, D> range = ranges.peek();
int cmp = candidate.get().compareTo(range.getCurrent());
assert cmp <= 0;
if (cmp < 0)
{
break; // candidate is smaller than next token, return immediately
}
else if (cmp == 0)
{
candidate.merge(range.next()); // consume and merge
range = ranges.poll();
// re-prioritize changed range
if (range.hasNext())
processedRanges.add(range);
else
FileUtils.closeQuietly(range);
}
}
ranges.addAll(processedRanges);
return candidate;
}
protected void performSkipTo(K nextToken)
{
List<RangeIterator<K, D>> changedRanges = new ArrayList<>();
while (!ranges.isEmpty())
{
if (ranges.peek().getCurrent().compareTo(nextToken) >= 0)
break;
RangeIterator<K, D> head = ranges.poll();
if (head.getMaximum().compareTo(nextToken) >= 0)
{
head.skipTo(nextToken);
changedRanges.add(head);
continue;
}
FileUtils.closeQuietly(head);
}
ranges.addAll(changedRanges.stream().collect(Collectors.toList()));
}
public void close() throws IOException
{
ranges.forEach(FileUtils::closeQuietly);
}
public static <K extends Comparable<K>, D extends CombinedValue<K>> Builder<K, D> builder()
{
return new Builder<>();
}
public static <K extends Comparable<K>, D extends CombinedValue<K>> RangeIterator<K, D> build(List<RangeIterator<K, D>> tokens)
{
return new Builder<K, D>().add(tokens).build();
}
public static class Builder<K extends Comparable<K>, D extends CombinedValue<K>> extends RangeIterator.Builder<K, D>
{
public Builder()
{
super(IteratorType.UNION);
}
protected RangeIterator<K, D> buildIterator()
{
switch (rangeCount())
{
case 1:
return ranges.poll();
default:
return new RangeUnionIterator<>(statistics, ranges);
}
}
}
}