blob: 7d568fe01ff36e1f57ff3acb3f745d19a78149e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.iterators.user;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This iterator facilitates document-partitioned indexing. It is an example of extending the
* IntersectingIterator to customize the placement of the term and docID. As with the
* IntersectingIterator, documents are grouped together and indexed into a single row of an Accumulo
* table. This allows a tablet server to perform boolean AND operations on terms in the index. This
* iterator also stores the document contents in a separate column family in the same row so that
* the full document can be returned with each query.
*
* The table structure should have the following form:
*
* row: shardID, colfam: docColf\0doctype, colqual: docID, value: doc
*
* row: shardID, colfam: indexColf, colqual: term\0doctype\0docID\0info, value: (empty)
*
* When you configure this iterator with a set of terms, it will return only the docIDs and docs
* that appear with all of the specified terms. The result will have the following form:
*
* row: shardID, colfam: indexColf, colqual: doctype\0docID\0info, value: doc
*
* This iterator is commonly used with BatchScanner to parallelize the search over all shardIDs.
*/
public class IndexedDocIterator extends IntersectingIterator {
private static final Logger log = LoggerFactory.getLogger(IndexedDocIterator.class);
public static final Text DEFAULT_INDEX_COLF = new Text("i");
public static final Text DEFAULT_DOC_COLF = new Text("e");
private static final String indexFamilyOptionName = "indexFamily";
private static final String docFamilyOptionName = "docFamily";
private Text indexColf = DEFAULT_INDEX_COLF;
private Text docColf = DEFAULT_DOC_COLF;
private Set<ByteSequence> indexColfSet;
private Set<ByteSequence> docColfSet;
private static final byte[] nullByte = {0};
public SortedKeyValueIterator<Key,Value> docSource;
@Override
protected Key buildKey(Text partition, Text term, Text docID) {
Text colq = new Text(term);
colq.append(nullByte, 0, 1);
colq.append(docID.getBytes(), 0, docID.getLength());
colq.append(nullByte, 0, 1);
return new Key(partition, indexColf, colq);
}
@Override
protected Key buildKey(Text partition, Text term) {
Text colq = new Text(term);
return new Key(partition, indexColf, colq);
}
@Override
protected Text getDocID(Key key) {
return parseDocID(key);
}
public static Text parseDocID(Key key) {
Text colq = key.getColumnQualifier();
int firstZeroIndex = colq.find("\0");
if (firstZeroIndex < 0) {
throw new IllegalArgumentException("bad docid: " + key);
}
int secondZeroIndex = colq.find("\0", firstZeroIndex + 1);
if (secondZeroIndex < 0) {
throw new IllegalArgumentException("bad docid: " + key);
}
int thirdZeroIndex = colq.find("\0", secondZeroIndex + 1);
if (thirdZeroIndex < 0) {
throw new IllegalArgumentException("bad docid: " + key);
}
Text docID = new Text();
try {
docID.set(colq.getBytes(), firstZeroIndex + 1, thirdZeroIndex - 1 - firstZeroIndex);
} catch (ArrayIndexOutOfBoundsException e) {
throw new IllegalArgumentException("bad indices for docid: " + key + " " + firstZeroIndex
+ " " + secondZeroIndex + " " + thirdZeroIndex);
}
return docID;
}
@Override
protected Text getTerm(Key key) {
if (indexColf.compareTo(key.getColumnFamily().getBytes(), 0, indexColf.getLength()) < 0) {
// We're past the index column family, so return a term that will sort lexicographically last.
// The last unicode character should suffice
return new Text("\uFFFD");
}
Text colq = key.getColumnQualifier();
int zeroIndex = colq.find("\0");
Text term = new Text();
term.set(colq.getBytes(), 0, zeroIndex);
return term;
}
@Override
public synchronized void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
if (options.containsKey(indexFamilyOptionName))
indexColf = new Text(options.get(indexFamilyOptionName));
if (options.containsKey(docFamilyOptionName))
docColf = new Text(options.get(docFamilyOptionName));
docSource = source.deepCopy(env);
indexColfSet = Collections
.singleton(new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
for (TermSource ts : this.sources) {
ts.seekColfams = indexColfSet;
}
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
@Override
public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive)
throws IOException {
super.seek(range, null, true);
}
@Override
protected void advanceToIntersection() throws IOException {
super.advanceToIntersection();
if (topKey == null)
return;
if (log.isTraceEnabled())
log.trace("using top key to seek for doc: {}", topKey);
Key docKey = buildDocKey();
docSource.seek(new Range(docKey, true, null, false), docColfSet, true);
log.debug("got doc key: {}", docSource.getTopKey());
if (docSource.hasTop() && docKey.equals(docSource.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
value = docSource.getTopValue();
}
log.debug("got doc value: {}", value);
}
protected Key buildDocKey() {
if (log.isTraceEnabled())
log.trace("building doc key for {} {}", currentPartition, currentDocID);
int zeroIndex = currentDocID.find("\0");
if (zeroIndex < 0)
throw new IllegalArgumentException("bad current docID");
Text colf = new Text(docColf);
colf.append(nullByte, 0, 1);
colf.append(currentDocID.getBytes(), 0, zeroIndex);
docColfSet = Collections.singleton(new ArrayByteSequence(colf.getBytes(), 0, colf.getLength()));
if (log.isTraceEnabled())
log.trace("{} {}", zeroIndex, currentDocID.getLength());
Text colq = new Text();
colq.set(currentDocID.getBytes(), zeroIndex + 1, currentDocID.getLength() - zeroIndex - 1);
Key k = new Key(currentPartition, colf, colq);
if (log.isTraceEnabled())
log.trace("built doc key for seek: {}", k);
return k;
}
/**
* A convenience method for setting the index column family.
*
* @param is
* IteratorSetting object to configure.
* @param indexColf
* the index column family
*/
public static void setIndexColf(IteratorSetting is, String indexColf) {
is.addOption(indexFamilyOptionName, indexColf);
}
/**
* A convenience method for setting the document column family prefix.
*
* @param is
* IteratorSetting object to configure.
* @param docColfPrefix
* the prefix of the document column family (colf will be of the form
* docColfPrefix\0doctype)
*/
public static void setDocColfPrefix(IteratorSetting is, String docColfPrefix) {
is.addOption(docFamilyOptionName, docColfPrefix);
}
/**
* A convenience method for setting the index column family and document column family prefix.
*
* @param is
* IteratorSetting object to configure.
* @param indexColf
* the index column family
* @param docColfPrefix
* the prefix of the document column family (colf will be of the form
* docColfPrefix\0doctype)
*/
public static void setColfs(IteratorSetting is, String indexColf, String docColfPrefix) {
setIndexColf(is, indexColf);
setDocColfPrefix(is, docColfPrefix);
}
}