blob: 27c199a6852359cc883e56d6d88ebf89a04c82bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.tries;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.cassandra.io.util.Rebufferer;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
/**
* Thread-unsafe reverse value iterator for on-disk tries. Uses the assumptions of {@link Walker}.
* <p>
* The main utility of this class is the {@link #nextPayloadedNode()} method, which lists all nodes that contain a
* payload within the requested bounds. The treatment of the bounds is non-standard (see
* {@link #ReverseValueIterator(Rebufferer, long, ByteComparable, ByteComparable, boolean)}), necessary to properly walk
* tries of prefixes and separators.
*/
@NotThreadSafe
public class ReverseValueIterator<Concrete extends ReverseValueIterator<Concrete>> extends Walker<Concrete>
{
static final int NOT_AT_LIMIT = Integer.MIN_VALUE;
private final ByteSource limit;
private IterationPosition stack;
private long next;
private boolean reportingPrefixes;
static class IterationPosition
{
final long node;
final int limit;
final IterationPosition prev;
int childIndex;
public IterationPosition(long node, int childIndex, int limit, IterationPosition prev)
{
super();
this.node = node;
this.childIndex = childIndex;
this.limit = limit;
this.prev = prev;
}
}
protected ReverseValueIterator(Rebufferer source, long root)
{
super(source, root);
limit = null;
initializeNoRightBound(root, NOT_AT_LIMIT, false);
}
/**
* Constrained iterator. The end position is always treated as inclusive, and we have two possible treatments for
* the start:
* <ul>
* <li> When {@code admitPrefix=false}, exact matches and any prefixes of the start are excluded.
* <li> When {@code admitPrefix=true}, the longest prefix of the start present in the trie is also included,
* provided that there is no entry in the trie between that prefix and the start. An exact match also
* satisfies this and is included.
* </ul>
* This behaviour is shared with the forward counterpart {@link ValueIterator}.
*/
protected ReverseValueIterator(Rebufferer source, long root, ByteComparable start, ByteComparable end, boolean admitPrefix)
{
super(source, root);
limit = start != null ? start.asComparableBytes(BYTE_COMPARABLE_VERSION) : null;
if (end != null)
initializeWithRightBound(root, end.asComparableBytes(BYTE_COMPARABLE_VERSION), admitPrefix, limit != null);
else
initializeNoRightBound(root, limit != null ? limit.next() : NOT_AT_LIMIT, admitPrefix);
}
void initializeWithRightBound(long root, ByteSource endStream, boolean admitPrefix, boolean hasLimit)
{
IterationPosition prev = null;
boolean atLimit = hasLimit;
int childIndex;
int limitByte;
reportingPrefixes = admitPrefix;
// Follow end position while we still have a prefix, stacking path.
go(root);
while (true)
{
int s = endStream.next();
childIndex = search(s);
limitByte = NOT_AT_LIMIT;
if (atLimit)
{
limitByte = limit.next();
if (s > limitByte)
atLimit = false;
}
if (childIndex < 0)
break;
prev = new IterationPosition(position, childIndex, limitByte, prev);
go(transition(childIndex)); // childIndex is positive, this transition must exist
}
// Advancing now gives us first match.
childIndex = -1 - childIndex;
stack = new IterationPosition(position, childIndex, limitByte, prev);
next = advanceNode();
}
private void initializeNoRightBound(long root, int limitByte, boolean admitPrefix)
{
go(root);
stack = new IterationPosition(root, -1 - search(256), limitByte, null);
next = advanceNode();
reportingPrefixes = admitPrefix;
}
/**
* Returns the position of the next node with payload contained in the iterated span.
*/
protected long nextPayloadedNode()
{
long toReturn = next;
if (next != -1)
next = advanceNode();
return toReturn;
}
long advanceNode()
{
if (stack == null)
return -1;
long child;
int transitionByte;
go(stack.node);
while (true)
{
// advance position in node
int childIdx = stack.childIndex - 1;
boolean beyondLimit = true;
if (childIdx >= 0)
{
transitionByte = transitionByte(childIdx);
beyondLimit = transitionByte < stack.limit;
if (beyondLimit)
{
assert stack.limit >= 0; // we are at a limit position (not in a node that's completely within the span)
reportingPrefixes = false; // there exists a smaller child than limit, no longer should report prefixes
}
}
else
transitionByte = Integer.MIN_VALUE;
if (beyondLimit)
{
// ascend to parent, remove from stack
IterationPosition stackTop = stack;
stack = stack.prev;
// Report payloads on the way up
// unless we are at limit and there has been a smaller child
if (payloadFlags() != 0)
{
// If we are fully inside the covered space, report.
// Note that on the exact match of the limit, stackTop.limit would be END_OF_STREAM.
// This comparison rejects the exact match; if we wanted to include it, we could test < 0 instead.
if (stackTop.limit == NOT_AT_LIMIT)
return stackTop.node;
else if (reportingPrefixes)
{
reportingPrefixes = false; // if we are at limit position only report one prefix, the closest
return stackTop.node;
}
// else skip this payload
}
if (stack == null) // exhausted whole trie
return NONE;
go(stack.node);
continue;
}
child = transition(childIdx);
if (child != NONE)
{
go(child);
stack.childIndex = childIdx;
// descend, stack up position
int l = NOT_AT_LIMIT;
if (transitionByte == stack.limit)
l = limit.next();
stack = new IterationPosition(child, transitionRange(), l, stack);
}
else
{
stack.childIndex = childIdx;
}
}
}
}