blob: 47a55e175b44c30dad6b1e9706700adb13ea4c04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.iterator;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
public class AndIterator implements SortedKeyValueIterator<Key,Value> {
protected static final Logger log = Logger.getLogger(AndIterator.class);
private TermSource[] sources;
private int sourcesCount = 0;
protected Text nullText = new Text();
protected final byte[] emptyByteArray = new byte[0];
private Key topKey = null;
protected Value value = new Value(emptyByteArray);
private Range overallRange;
private Text currentRow = null;
private Text currentTerm = new Text(emptyByteArray);
private Text currentDocID = new Text(emptyByteArray);
private static boolean SEEK_INCLUSIVE = true;
private Text parentEndRow;
/**
* Used in representing a Term that is intersected on.
*/
protected static class TermSource {
public SortedKeyValueIterator<Key,Value> iter;
public Text dataLocation;
public Text term;
public boolean notFlag;
private Collection<ByteSequence> seekColumnFamilies;
private TermSource(TermSource other) {
this(other.iter, other.dataLocation, other.term, other.notFlag);
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term) {
this(iter, dataLocation, term, false);
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term, boolean notFlag) {
this.iter = iter;
this.dataLocation = dataLocation;
ByteSequence bs = new ArrayByteSequence(dataLocation.getBytes(), 0, dataLocation.getLength());
this.seekColumnFamilies = Collections.singletonList(bs);
this.term = term;
this.notFlag = notFlag;
}
public String getTermString() {
return (this.term == null) ? new String("Iterator") : this.term.toString();
}
}
/*
* | Row | Column Family | Column Qualifier | Value
* | {RowID} | {dataLocation} | {term}\0{dataType}\0{UID} | Empty
*/
protected Text getPartition(Key key) {
return key.getRow();
}
/**
* Returns the given key's dataLocation
*
* @param key
* @return The given key's dataLocation
*/
protected Text getDataLocation(Key key) {
return key.getColumnFamily();
}
/**
* Returns the given key's term
*
* @param key
* @return The given key's term
*/
protected Text getTerm(Key key) {
int idx = 0;
String sKey = key.getColumnQualifier().toString();
idx = sKey.indexOf("\0");
return new Text(sKey.substring(0, idx));
}
/**
* Returns the given key's DocID
*
* @param key
* @return The given key's DocID
*/
protected Text getDocID(Key key) {
int idx = 0;
String sKey = key.getColumnQualifier().toString();
idx = sKey.indexOf("\0");
return new Text(sKey.substring(idx + 1));
}
/**
* Returns the given key's UID
*
* @param key
* @return The given key's UID
*/
protected String getUID(Key key) {
int idx = 0;
String sKey = key.getColumnQualifier().toString();
idx = sKey.indexOf("\0");
return sKey.substring(idx + 1);
}
/**
* Build a key from the given row and dataLocation
*
* @param row
* The desired row
* @param dataLocation
* The desired dataLocation
* @return A Key object built from the given row and dataLocation.
*/
protected Key buildKey(Text row, Text dataLocation) {
return new Key(row, (dataLocation == null) ? nullText : dataLocation);
}
/**
* Build a key from the given row, dataLocation, and term
*
* @param row
* The desired row
* @param dataLocation
* The desired dataLocation
* @param term
* The desired term
* @return A Key object built from the given row, dataLocation, and term.
*/
protected Key buildKey(Text row, Text dataLocation, Text term) {
return new Key(row, (dataLocation == null) ? nullText : dataLocation, (term == null) ? nullText : term);
}
/**
* Return the key that directly follows the given key
*
* @param key
* The key who will be directly before the returned key
* @return The key directly following the given key.
*/
protected Key buildFollowingPartitionKey(Key key) {
return key.followingKey(PartialKey.ROW);
}
/**
* Empty default constructor
*/
public AndIterator() {}
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new AndIterator(this, env);
}
public AndIterator(AndIterator other, IteratorEnvironment env) {
if (other.sources != null) {
sourcesCount = other.sourcesCount;
sources = new TermSource[sourcesCount];
for (int i = 0; i < sourcesCount; i++) {
sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].dataLocation, other.sources[i].term);
}
}
}
public Key getTopKey() {
return topKey;
}
public Value getTopValue() {
return value;
}
public boolean hasTop() {
return currentRow != null;
}
/**
* Find the next key in the current TermSource that is at or beyond the cursor (currentRow, currentTerm, currentDocID).
*
* @param sourceID
* The index of the current source in <code>sources</code>
* @return True if the source advanced beyond the cursor
* @throws IOException
*/
private boolean seekOneSource(TermSource ts) throws IOException {
/*
* Within this loop progress must be made in one of the following forms: - currentRow, currentTerm, or curretDocID must be increased - the given source must
* advance its iterator This loop will end when any of the following criteria are met - the iterator for the given source is pointing to the key
* (currentRow, columnFamilies[sourceID], currentTerm, currentDocID) - the given source is out of data and currentRow is set to null - the given source has
* advanced beyond the endRow and currentRow is set to null
*/
// precondition: currentRow is not null
boolean advancedCursor = false;
while (true) {
if (ts.iter.hasTop() == false) {
if (log.isDebugEnabled()) {
log.debug("The current iterator no longer has a top");
}
// If we got to the end of an iterator, found a Match if it's a NOT
if (ts.notFlag) {
break;
}
currentRow = null;
// setting currentRow to null counts as advancing the cursor
return true;
}
// check if we're past the end key
int endCompare = -1;
if (log.isDebugEnabled()) {
log.debug("Current topKey = " + ts.iter.getTopKey());
}
// we should compare the row to the end of the range
if (overallRange.getEndKey() != null) {
if (log.isDebugEnabled()) {
log.debug("II.seekOneSource overallRange.getEndKey() != null");
}
endCompare = overallRange.getEndKey().getRow().compareTo(ts.iter.getTopKey().getRow());
if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
if (log.isDebugEnabled()) {
log.debug("II.seekOneSource at the end of the tablet server");
}
currentRow = null;
// setting currentRow to null counts as advancing the cursor
return true;
}
} else {
if (log.isDebugEnabled()) {
log.debug("II.seekOneSource overallRange.getEndKey() == null");
}
}
// Compare the Row IDs
int partitionCompare = currentRow.compareTo(getPartition(ts.iter.getTopKey()));
if (log.isDebugEnabled()) {
log.debug("Current partition: " + currentRow);
}
// check if this source is already at or beyond currentRow
// if not, then seek to at least the current row
if (partitionCompare > 0) {
if (log.isDebugEnabled()) {
log.debug("Need to seek to the current row");
// seek to at least the currentRow
log.debug("ts.dataLocation = " + ts.dataLocation.getBytes());
log.debug("Term = " + new Text(ts.term + "\0" + currentDocID).getBytes());
}
Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// new Text(ts.term + "\0" + currentDocID));
if (log.isDebugEnabled()) {
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
continue;
}
// check if this source has gone beyond currentRow
// if so, advance currentRow
if (partitionCompare < 0) {
if (log.isDebugEnabled()) {
log.debug("Went too far beyond the currentRow");
}
if (ts.notFlag) {
break;
}
currentRow.set(getPartition(ts.iter.getTopKey()));
currentDocID.set(emptyByteArray);
advancedCursor = true;
continue;
}
// we have verified that the current source is positioned in currentRow
// now we must make sure we're in the right columnFamily in the current row
if (ts.dataLocation != null) {
int dataLocationCompare = ts.dataLocation.compareTo(getDataLocation(ts.iter.getTopKey()));
if (log.isDebugEnabled()) {
log.debug("Comparing dataLocations");
log.debug("dataLocation = " + ts.dataLocation);
log.debug("newDataLocation = " + getDataLocation(ts.iter.getTopKey()));
}
// check if this source is already on the right columnFamily
// if not, then seek forwards to the right columnFamily
if (dataLocationCompare > 0) {
if (log.isDebugEnabled()) {
log.debug("Need to seek to the right dataLocation");
}
Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// , new Text(ts.term + "\0" + currentDocID));
if (log.isDebugEnabled()) {
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
if (!ts.iter.hasTop()) {
currentRow = null;
return true;
}
continue;
}
// check if this source is beyond the right columnFamily
// if so, then seek to the next row
if (dataLocationCompare < 0) {
if (log.isDebugEnabled()) {
log.debug("Went too far beyond the dataLocation");
}
if (endCompare == 0) {
// we're done
currentRow = null;
// setting currentRow to null counts as advancing the cursor
return true;
}
// Seeking beyond the current dataLocation gives a valid negated result
if (ts.notFlag) {
break;
}
Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
if (log.isDebugEnabled()) {
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
if (!ts.iter.hasTop()) {
currentRow = null;
return true;
}
continue;
}
}
// Compare the Terms
int termCompare = ts.term.compareTo(getTerm(ts.iter.getTopKey()));
if (log.isDebugEnabled()) {
log.debug("term = " + ts.term);
log.debug("newTerm = " + getTerm(ts.iter.getTopKey()));
}
// We need to seek down farther into the data
if (termCompare > 0) {
if (log.isDebugEnabled()) {
log.debug("Need to seek to the right term");
}
Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0"));// new Text(ts.term + "\0" + currentDocID));
if (log.isDebugEnabled()) {
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
if (!ts.iter.hasTop()) {
currentRow = null;
return true;
}
// currentTerm = getTerm(ts.iter.getTopKey());
if (log.isDebugEnabled()) {
log.debug("topKey after seeking to correct term: " + ts.iter.getTopKey());
}
continue;
}
// We've jumped out of the current term, set the new term as currentTerm and start looking again
if (termCompare < 0) {
if (log.isDebugEnabled()) {
log.debug("TERM: Need to jump to the next row");
}
if (endCompare == 0) {
currentRow = null;
return true;
}
if (ts.notFlag) {
break;
}
Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
if (log.isDebugEnabled()) {
log.debug("Using this key to find the next key: " + ts.iter.getTopKey());
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
if (!ts.iter.hasTop()) {
currentRow = null;
return true;
}
currentTerm = getTerm(ts.iter.getTopKey());
continue;
}
// Compare the DocIDs
Text docid = getDocID(ts.iter.getTopKey());
int docidCompare = currentDocID.compareTo(docid);
if (log.isDebugEnabled()) {
log.debug("Comparing DocIDs");
log.debug("currentDocID = " + currentDocID);
log.debug("docid = " + docid);
}
// The source isn't at the right DOC
if (docidCompare > 0) {
if (log.isDebugEnabled()) {
log.debug("Need to seek to the correct docid");
}
// seek forwards
Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0" + currentDocID));
if (log.isDebugEnabled()) {
log.debug("Seeking to: " + seekKey);
}
ts.iter.seek(new Range(seekKey, true, null, false), ts.seekColumnFamilies, SEEK_INCLUSIVE);
continue;
}
// if this source has advanced beyond the current column qualifier then advance currentCQ and return true
if (docidCompare < 0) {
if (ts.notFlag) {
break;
}
if (log.isDebugEnabled()) {
log.debug("We went too far, update the currentDocID to be the location of where were seek'ed to");
}
currentDocID.set(docid);
advancedCursor = true;
break;
}
// Set the term as currentTerm (in case we found this record on the first try)
currentTerm = getTerm(ts.iter.getTopKey());
if (log.isDebugEnabled()) {
log.debug("currentTerm = " + currentTerm);
}
// If we're negated, next() the first TermSource since we guaranteed it was not a NOT term
if (ts.notFlag) {
sources[0].iter.next();
advancedCursor = true;
}
// If we got here, we have a match
break;
}
return advancedCursor;
}
public void next() throws IOException {
if (log.isDebugEnabled()) {
log.debug("In ModifiedIntersectingIterator.next()");
}
if (currentRow == null) {
return;
}
// precondition: the current row is set up and the sources all have the same column qualifier
// while we don't have a match, seek in the source with the smallest column qualifier
sources[0].iter.next();
advanceToIntersection();
if (hasTop()) {
if (overallRange != null && !overallRange.contains(topKey)) {
topKey = null;
}
}
}
protected void advanceToIntersection() throws IOException {
if (log.isDebugEnabled()) {
log.debug("In AndIterator.advanceToIntersection()");
}
boolean cursorChanged = true;
while (cursorChanged) {
// seek all of the sources to at least the highest seen column qualifier in the current row
cursorChanged = false;
for (TermSource ts : sources) {
if (currentRow == null) {
topKey = null;
return;
}
if (seekOneSource(ts)) {
cursorChanged = true;
break;
}
}
}
topKey = buildKey(currentRow, currentTerm, currentDocID);
if (log.isDebugEnabled()) {
log.debug("ModifiedIntersectingIterator: Got a match: " + topKey);
}
}
public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
if (iter.hasTop()) {
return iter.getTopKey().toString();
}
return "";
}
public static final String columnFamiliesOptionName = "columnFamilies";
public static final String termValuesOptionName = "termValues";
public static final String notFlagsOptionName = "notFlags";
/**
* Encode a <code>Text</code> array of all the columns to intersect on
*
* @param columns
* The columns to be encoded
* @return A Base64 encoded string (using a \n delimiter) of all columns to intersect on.
*/
public static String encodeColumns(Text[] columns) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.length; i++) {
sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
sb.append('\n');
}
return sb.toString();
}
/**
* Encode a <code>Text</code> array of all of the terms to intersect on. The terms should match the columns in a one-to-one manner
*
* @param terms
* The terms to be encoded
* @return A Base64 encoded string (using a \n delimiter) of all terms to intersect on.
*/
public static String encodeTermValues(Text[] terms) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < terms.length; i++) {
sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(terms[i]))));
sb.append('\n');
}
return sb.toString();
}
/**
* Encode an array of <code>booleans</code> denoted which columns are NOT'ed
*
* @param flags
* The array of NOTs
* @return A base64 encoded string of which columns are NOT'ed
*/
public static String encodeBooleans(boolean[] flags) {
byte[] bytes = new byte[flags.length];
for (int i = 0; i < flags.length; i++) {
if (flags[i]) {
bytes[i] = 1;
} else {
bytes[i] = 0;
}
}
return new String(Base64.encodeBase64(bytes));
}
/**
* Decode the encoded columns into a <code>Text</code> array
*
* @param columns
* The Base64 encoded String of the columns
* @return A Text array of the decoded columns
*/
public static Text[] decodeColumns(String columns) {
String[] columnStrings = columns.split("\n");
Text[] columnTexts = new Text[columnStrings.length];
for (int i = 0; i < columnStrings.length; i++) {
columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
}
return columnTexts;
}
/**
* Decode the encoded terms into a <code>Text</code> array
*
* @param terms
* The Base64 encoded String of the terms
* @return A Text array of decoded terms.
*/
public static Text[] decodeTermValues(String terms) {
String[] termStrings = terms.split("\n");
Text[] termTexts = new Text[termStrings.length];
for (int i = 0; i < termStrings.length; i++) {
termTexts[i] = new Text(Base64.decodeBase64(termStrings[i].getBytes()));
}
return termTexts;
}
/**
* Decode the encoded NOT flags into a <code>boolean</code> array
*
* @param flags
* @return A boolean array of decoded NOT flags
*/
public static boolean[] decodeBooleans(String flags) {
// return null of there were no flags
if (flags == null) {
return null;
}
byte[] bytes = Base64.decodeBase64(flags.getBytes());
boolean[] bFlags = new boolean[bytes.length];
for (int i = 0; i < bytes.length; i++) {
if (bytes[i] == 1) {
bFlags[i] = true;
} else {
bFlags[i] = false;
}
}
return bFlags;
}
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
if (log.isDebugEnabled()) {
log.debug("In AndIterator.init()");
}
Text[] dataLocations = decodeColumns(options.get(columnFamiliesOptionName));
Text[] terms = decodeTermValues(options.get(termValuesOptionName));
boolean[] notFlags = decodeBooleans(options.get(notFlagsOptionName));
if (terms.length < 2) {
throw new IllegalArgumentException("AndIterator requires two or more columns families");
}
// Scan the not flags.
// There must be at least one term that isn't negated
// And we are going to re-order such that the first term is not a ! term
if (notFlags == null) {
notFlags = new boolean[terms.length];
for (int i = 0; i < terms.length; i++) {
notFlags[i] = false;
}
}
// Make sure that the first dataLocation/Term is not a NOT by swapping it with a later dataLocation/Term
if (notFlags[0]) {
for (int i = 1; i < notFlags.length; i++) {
if (notFlags[i] == false) {
// Swap the terms
Text swap = new Text(terms[0]);
terms[0].set(terms[i]);
terms[i].set(swap);
// Swap the dataLocations
swap.set(dataLocations[0]);
dataLocations[0].set(dataLocations[i]);
dataLocations[i].set(swap);
// Flip the notFlags
notFlags[0] = false;
notFlags[i] = true;
break;
}
}
if (notFlags[0]) {
throw new IllegalArgumentException("AndIterator requires at least one column family without not");
}
}
// Build up the array of sources that are to be intersected
sources = new TermSource[dataLocations.length];
for (int i = 0; i < dataLocations.length; i++) {
sources[i] = new TermSource(source.deepCopy(env), dataLocations[i], terms[i], notFlags[i]);
}
sourcesCount = dataLocations.length;
}
public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
if (log.isDebugEnabled()) {
log.debug("In AndIterator.seek()");
log.debug("AndIterator.seek Given range => " + range);
}
currentRow = new Text();
currentDocID.set(emptyByteArray);
doSeek(range);
}
private void doSeek(Range range) throws IOException {
overallRange = new Range(range);
if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
this.parentEndRow = range.getEndKey().getRow();
}
// seek each of the sources to the right column family within the row given by key
for (int i = 0; i < sourcesCount; i++) {
Key sourceKey;
Text dataLocation = (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation;
if (range.getStartKey() != null) {
// Build a key with the DocID if one is given
if (range.getStartKey().getColumnFamily() != null) {
sourceKey = buildKey(getPartition(range.getStartKey()), dataLocation,
(sources[i].term == null) ? nullText : new Text(sources[i].term + "\0" + range.getStartKey().getColumnFamily()));
} // Build a key with just the term.
else {
sourceKey = buildKey(getPartition(range.getStartKey()), dataLocation,
(sources[i].term == null) ? nullText : sources[i].term);
}
if (!range.isStartKeyInclusive())
sourceKey = sourceKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL);
sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColumnFamilies, SEEK_INCLUSIVE);
} else {
sources[i].iter.seek(range, sources[i].seekColumnFamilies, SEEK_INCLUSIVE);
}
}
advanceToIntersection();
if (hasTop()) {
if (overallRange != null && !overallRange.contains(topKey)) {
topKey = null;
if (log.isDebugEnabled()) {
log.debug("doSeek, topKey is outside of overall range: " + overallRange);
}
}
}
}
public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
addSource(source, env, null, term, notFlag);
}
public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text dataLocation, Text term, boolean notFlag) {
// Check if we have space for the added Source
if (sources == null) {
sources = new TermSource[1];
} else {
// allocate space for node, and copy current tree.
// TODO: Should we change this to an ArrayList so that we can just add() ?
TermSource[] localSources = new TermSource[sources.length + 1];
int currSource = 0;
for (TermSource myTerm : sources) {
// TODO: Do I need to call new here? or can I just re-use the term?
localSources[currSource] = new TermSource(myTerm);
currSource++;
}
sources = localSources;
}
sources[sourcesCount] = new TermSource(source.deepCopy(env), dataLocation, term, notFlag);
sourcesCount++;
}
public boolean jump(Key jumpKey) throws IOException {
if (log.isDebugEnabled()) {
log.debug("jump: " + jumpKey);
}
// is the jumpKey outside my overall range?
if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
// can't go there.
if (log.isDebugEnabled()) {
log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
}
return false;
}
if (!hasTop()) {
// TODO: will need to add current/last row if you want to measure if
// we don't have topkey because we hit end of tablet.
if (log.isDebugEnabled()) {
log.debug("jump called, but topKey is null, must need to move to next row");
}
return false;
} else {
int comp = this.topKey.getRow().compareTo(jumpKey.getRow());
// compare rows
if (comp > 0) {
if (log.isDebugEnabled()) {
log.debug("jump, our row is ahead of jumpKey.");
log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
}
return hasTop(); // do nothing, we're ahead of jumpKey row
} else if (comp < 0) { // a row behind jump key, need to move forward
if (log.isDebugEnabled()) {
log.debug("II jump, row jump");
}
Key endKey = null;
if (parentEndRow != null) {
endKey = new Key(parentEndRow);
}
Key sKey = new Key(jumpKey.getRow());
Range fake = new Range(sKey, true, endKey, false);
this.seek(fake, null, false);
return hasTop();
} else {
// need to check uid
String myUid = this.topKey.getColumnQualifier().toString();
String jumpUid = getUID(jumpKey);
if (log.isDebugEnabled()) {
if (myUid == null) {
log.debug("myUid is null");
} else {
log.debug("myUid: " + myUid);
}
if (jumpUid == null) {
log.debug("jumpUid is null");
} else {
log.debug("jumpUid: " + jumpUid);
}
}
int ucomp = myUid.compareTo(jumpUid);
if (ucomp < 0) { // need to move all sources forward
if (log.isDebugEnabled()) {
log.debug("jump, uid jump");
}
Text row = jumpKey.getRow();
Range range = new Range(row);
this.currentRow = row;
this.currentDocID = new Text(this.getUID(jumpKey));
doSeek(range);
// make sure it is in the range if we have one.
if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
topKey = null;
}
if (log.isDebugEnabled() && hasTop()) {
log.debug("jump, topKey is now: " + topKey);
}
return hasTop();
}// else do nothing
if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
topKey = null;
}
return hasTop();
}
}
}
}