blob: 4334d3b2ed31af42c74ac47c5bd974f6414e0cf0 [file] [log] [blame]
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.MapBackedSet;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/** An IndexReader which reads multiple, parallel indexes. Each index added
* must have the same number of documents, but typically each contains
* different fields. Each document contains the union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
*/
public class ParallelReader extends IndexReader {
private List<IndexReader> readers = new ArrayList<IndexReader>();
private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
boolean incRefReaders = false;
private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
private int maxDoc;
private int numDocs;
private boolean hasDeletions;
/** Construct a ParallelReader.
* <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
*/
public ParallelReader() throws IOException { this(true); }
/** Construct a ParallelReader.
* @param closeSubReaders indicates whether the subreaders should be closed
* when this ParallelReader is closed
*/
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelReader(");
final Iterator<IndexReader> iter = readers.iterator();
if (iter.hasNext()) {
buffer.append(iter.next());
}
while (iter.hasNext()) {
buffer.append(", ").append(iter.next());
}
buffer.append(')');
return buffer.toString();
}
/** Add an IndexReader.
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader) throws IOException {
ensureOpen();
add(reader, false);
}
/** Add an IndexReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link IndexReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader, boolean ignoreStoredFields)
throws IOException {
ensureOpen();
if (readers.size() == 0) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
this.hasDeletions = reader.hasDeletions();
}
if (reader.maxDoc() != maxDoc) // check compatibility
throw new IllegalArgumentException
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
readerToFields.put(reader, fields);
for (final String field : fields) { // update fieldToReader map
if (fieldToReader.get(field) == null)
fieldToReader.put(field, reader);
}
if (!ignoreStoredFields)
storedFieldReaders.add(reader); // add to storedFieldReaders
readers.add(reader);
if (incRefReaders) {
reader.incRef();
}
decrefOnClose.add(Boolean.valueOf(incRefReaders));
}
@Override
public synchronized Object clone() {
// doReopen calls ensureOpen
try {
return doReopen(true);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Tries to reopen the subreaders.
* <br>
* If one or more subreaders could be re-opened (i. e. subReader.reopen()
* returned a new instance != subReader), then a new ParallelReader instance
* is returned, otherwise this instance is returned.
* <p>
* A re-opened instance might share one or more subreaders with the old
* instance. Index modification operations result in undefined behavior
* when performed before the old instance is closed.
* (see {@link IndexReader#reopen()}).
* <p>
* If subreaders are shared, then the reference count of those
* readers is increased to ensure that the subreaders remain open
* until the last referring reader is closed.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@Override
public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
// doReopen calls ensureOpen
return doReopen(false);
}
protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
ensureOpen();
boolean reopened = false;
List<IndexReader> newReaders = new ArrayList<IndexReader>();
boolean success = false;
try {
for (final IndexReader oldReader : readers) {
IndexReader newReader = null;
if (doClone) {
newReader = (IndexReader) oldReader.clone();
} else {
newReader = oldReader.reopen();
}
newReaders.add(newReader);
// if at least one of the subreaders was updated we remember that
// and return a new ParallelReader
if (newReader != oldReader) {
reopened = true;
}
}
success = true;
} finally {
if (!success && reopened) {
for (int i = 0; i < newReaders.size(); i++) {
IndexReader r = newReaders.get(i);
if (r != readers.get(i)) {
try {
r.close();
} catch (IOException ignore) {
// keep going - we want to clean up as much as possible
}
}
}
}
}
if (reopened) {
List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
ParallelReader pr = new ParallelReader();
for (int i = 0; i < readers.size(); i++) {
IndexReader oldReader = readers.get(i);
IndexReader newReader = newReaders.get(i);
if (newReader == oldReader) {
newDecrefOnClose.add(Boolean.TRUE);
newReader.incRef();
} else {
// this is a new subreader instance, so on close() we don't
// decRef but close it
newDecrefOnClose.add(Boolean.FALSE);
}
pr.add(newReader, !storedFieldReaders.contains(oldReader));
}
pr.decrefOnClose = newDecrefOnClose;
pr.incRefReaders = incRefReaders;
return pr;
} else {
// No subreader was refreshed
return this;
}
}
@Override
public int numDocs() {
// Don't call ensureOpen() here (it could affect performance)
return numDocs;
}
@Override
public int maxDoc() {
// Don't call ensureOpen() here (it could affect performance)
return maxDoc;
}
@Override
public boolean hasDeletions() {
ensureOpen();
return hasDeletions;
}
// check first reader
@Override
public boolean isDeleted(int n) {
// Don't call ensureOpen() here (it could affect performance)
if (readers.size() > 0)
return readers.get(0).isDeleted(n);
return false;
}
// delete in all readers
@Override
protected void doDelete(int n) throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.deleteDocument(n);
}
hasDeletions = true;
}
// undeleteAll in all readers
@Override
protected void doUndeleteAll() throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.undeleteAll();
}
hasDeletions = false;
}
// append fields from storedFieldReaders
@Override
public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
ensureOpen();
Document result = new Document();
for (final IndexReader reader: storedFieldReaders) {
boolean include = (fieldSelector==null);
if (!include) {
Collection<String> fields = readerToFields.get(reader);
for (final String field : fields)
if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
include = true;
break;
}
}
if (include) {
List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
for (Fieldable field : fields) {
result.add(field);
}
}
}
return result;
}
// get all vectors
@Override
public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
ensureOpen();
ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
TermFreqVector vector = reader.getTermFreqVector(n, field);
if (vector != null)
results.add(vector);
}
return results.toArray(new TermFreqVector[results.size()]);
}
@Override
public TermFreqVector getTermFreqVector(int n, String field)
throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? null : reader.getTermFreqVector(n, field);
}
@Override
public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader != null) {
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
ensureOpen();
for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public boolean hasNorms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? false : reader.hasNorms(field);
}
@Override
public byte[] norms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? null : reader.norms(field);
}
@Override
public void norms(String field, byte[] result, int offset)
throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
reader.norms(field, result, offset);
}
@Override
protected void doSetNorm(int n, String field, byte value)
throws CorruptIndexException, IOException {
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
reader.doSetNorm(n, field, value);
}
@Override
public TermEnum terms() throws IOException {
ensureOpen();
return new ParallelTermEnum();
}
@Override
public TermEnum terms(Term term) throws IOException {
ensureOpen();
return new ParallelTermEnum(term);
}
@Override
public int docFreq(Term term) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(term.field());
return reader==null ? 0 : reader.docFreq(term);
}
@Override
public TermDocs termDocs(Term term) throws IOException {
ensureOpen();
return new ParallelTermDocs(term);
}
@Override
public TermDocs termDocs() throws IOException {
ensureOpen();
return new ParallelTermDocs();
}
@Override
public TermPositions termPositions(Term term) throws IOException {
ensureOpen();
return new ParallelTermPositions(term);
}
@Override
public TermPositions termPositions() throws IOException {
ensureOpen();
return new ParallelTermPositions();
}
/**
* Checks recursively if all subreaders are up to date.
*/
@Override
public boolean isCurrent() throws CorruptIndexException, IOException {
ensureOpen();
for (final IndexReader reader : readers) {
if (!reader.isCurrent()) {
return false;
}
}
// all subreaders are up to date
return true;
}
/**
* Checks recursively if all subindexes are optimized
*/
@Override
public boolean isOptimized() {
ensureOpen();
for (final IndexReader reader : readers) {
if (!reader.isOptimized()) {
return false;
}
}
// all subindexes are optimized
return true;
}
/** Not implemented.
* @throws UnsupportedOperationException
*/
@Override
public long getVersion() {
throw new UnsupportedOperationException("ParallelReader does not support this method.");
}
// for testing
IndexReader[] getSubReaders() {
return readers.toArray(new IndexReader[readers.size()]);
}
@Override
protected void doCommit(Map<String,String> commitUserData) throws IOException {
for (final IndexReader reader : readers)
reader.commit(commitUserData);
}
@Override
protected synchronized void doClose() throws IOException {
for (int i = 0; i < readers.size(); i++) {
if (decrefOnClose.get(i).booleanValue()) {
readers.get(i).decRef();
} else {
readers.get(i).close();
}
}
}
@Override
public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
ensureOpen();
Set<String> fieldSet = new HashSet<String>();
for (final IndexReader reader : readers) {
Collection<String> names = reader.getFieldNames(fieldNames);
fieldSet.addAll(names);
}
return fieldSet;
}
private class ParallelTermEnum extends TermEnum {
private String field;
private Iterator<String> fieldIterator;
private TermEnum termEnum;
public ParallelTermEnum() throws IOException {
try {
field = fieldToReader.firstKey();
} catch(NoSuchElementException e) {
// No fields, so keep field == null, termEnum == null
return;
}
if (field != null)
termEnum = fieldToReader.get(field).terms();
}
public ParallelTermEnum(Term term) throws IOException {
field = term.field();
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
termEnum = reader.terms(term);
}
@Override
public boolean next() throws IOException {
if (termEnum==null)
return false;
// another term in this field?
if (termEnum.next() && termEnum.term().field()==field)
return true; // yes, keep going
termEnum.close(); // close old termEnum
// find the next field with terms, if any
if (fieldIterator==null) {
fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
fieldIterator.next(); // Skip field to get next one
}
while (fieldIterator.hasNext()) {
field = fieldIterator.next();
termEnum = fieldToReader.get(field).terms(new Term(field));
Term term = termEnum.term();
if (term!=null && term.field()==field)
return true;
else
termEnum.close();
}
return false; // no more fields
}
@Override
public Term term() {
if (termEnum==null)
return null;
return termEnum.term();
}
@Override
public int docFreq() {
if (termEnum==null)
return 0;
return termEnum.docFreq();
}
@Override
public void close() throws IOException {
if (termEnum!=null)
termEnum.close();
}
}
// wrap a TermDocs in order to support seek(Term)
private class ParallelTermDocs implements TermDocs {
protected TermDocs termDocs;
public ParallelTermDocs() {}
public ParallelTermDocs(Term term) throws IOException {
if (term == null)
termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
else
seek(term);
}
public int doc() { return termDocs.doc(); }
public int freq() { return termDocs.freq(); }
public void seek(Term term) throws IOException {
IndexReader reader = fieldToReader.get(term.field());
termDocs = reader!=null ? reader.termDocs(term) : null;
}
public void seek(TermEnum termEnum) throws IOException {
seek(termEnum.term());
}
public boolean next() throws IOException {
if (termDocs==null)
return false;
return termDocs.next();
}
public int read(final int[] docs, final int[] freqs) throws IOException {
if (termDocs==null)
return 0;
return termDocs.read(docs, freqs);
}
public boolean skipTo(int target) throws IOException {
if (termDocs==null)
return false;
return termDocs.skipTo(target);
}
public void close() throws IOException {
if (termDocs!=null)
termDocs.close();
}
}
private class ParallelTermPositions
extends ParallelTermDocs implements TermPositions {
public ParallelTermPositions() {}
public ParallelTermPositions(Term term) throws IOException { seek(term); }
@Override
public void seek(Term term) throws IOException {
IndexReader reader = fieldToReader.get(term.field());
termDocs = reader!=null ? reader.termPositions(term) : null;
}
public int nextPosition() throws IOException {
// It is an error to call this if there is no next position, e.g. if termDocs==null
return ((TermPositions)termDocs).nextPosition();
}
public int getPayloadLength() {
return ((TermPositions)termDocs).getPayloadLength();
}
public byte[] getPayload(byte[] data, int offset) throws IOException {
return ((TermPositions)termDocs).getPayload(data, offset);
}
// TODO: Remove warning after API has been finalized
public boolean isPayloadAvailable() {
return ((TermPositions) termDocs).isPayloadAvailable();
}
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.addReaderFinishedListener(listener);
}
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.removeReaderFinishedListener(listener);
}
}
}