blob: df4b731cd3fc9c5f670ff6ef3cf064d34784fff3 [file] [log] [blame]
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.index.codecs.PerDocValues;
import org.apache.lucene.index.values.IndexDocValues;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.MapBackedSet;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/** An IndexReader which reads multiple, parallel indexes. Each index added
* must have the same number of documents, but typically each contains
* different fields. Each document contains the union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
*/
public class ParallelReader extends IndexReader {
private List<IndexReader> readers = new ArrayList<IndexReader>();
private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
boolean incRefReaders = false;
private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
private Map<String,byte[]> normsCache = new HashMap<String,byte[]>();
private final ReaderContext topLevelReaderContext = new AtomicReaderContext(this);
private int maxDoc;
private int numDocs;
private boolean hasDeletions;
private final ParallelFields fields = new ParallelFields();
private final ParallelPerDocs perDocs = new ParallelPerDocs();
/** Construct a ParallelReader.
* <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
*/
public ParallelReader() throws IOException { this(true); }
/** Construct a ParallelReader.
* @param closeSubReaders indicates whether the subreaders should be closed
* when this ParallelReader is closed
*/
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelReader(");
final Iterator<IndexReader> iter = readers.iterator();
if (iter.hasNext()) {
buffer.append(iter.next());
}
while (iter.hasNext()) {
buffer.append(", ").append(iter.next());
}
buffer.append(')');
return buffer.toString();
}
/** Add an IndexReader.
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader) throws IOException {
ensureOpen();
add(reader, false);
}
/** Add an IndexReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link IndexReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader, boolean ignoreStoredFields)
throws IOException {
ensureOpen();
if (readers.size() == 0) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
this.hasDeletions = reader.hasDeletions();
}
if (reader.maxDoc() != maxDoc) // check compatibility
throw new IllegalArgumentException
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
readerToFields.put(reader, fields);
for (final String field : fields) { // update fieldToReader map
if (fieldToReader.get(field) == null) {
fieldToReader.put(field, reader);
}
this.fields.addField(field, reader);
this.perDocs.addField(field, reader);
}
if (!ignoreStoredFields)
storedFieldReaders.add(reader); // add to storedFieldReaders
readers.add(reader);
if (incRefReaders) {
reader.incRef();
}
decrefOnClose.add(Boolean.valueOf(incRefReaders));
synchronized(normsCache) {
normsCache.clear(); // TODO: don't need to clear this for all fields really?
}
}
private class ParallelFieldsEnum extends FieldsEnum {
String currentField;
IndexReader currentReader;
Iterator<String> keys;
ParallelFieldsEnum() {
keys = fieldToReader.keySet().iterator();
}
@Override
public String next() throws IOException {
if (keys.hasNext()) {
currentField = keys.next();
currentReader = fieldToReader.get(currentField);
} else {
currentField = null;
currentReader = null;
}
return currentField;
}
@Override
public TermsEnum terms() throws IOException {
assert currentReader != null;
Terms terms = MultiFields.getTerms(currentReader, currentField);
if (terms != null) {
return terms.iterator();
} else {
return TermsEnum.EMPTY;
}
}
}
// Single instance of this, per ParallelReader instance
private class ParallelFields extends Fields {
final HashMap<String,Terms> fields = new HashMap<String,Terms>();
public void addField(String field, IndexReader r) throws IOException {
Fields multiFields = MultiFields.getFields(r);
fields.put(field, multiFields.terms(field));
}
@Override
public FieldsEnum iterator() throws IOException {
return new ParallelFieldsEnum();
}
@Override
public Terms terms(String field) throws IOException {
return fields.get(field);
}
}
@Override
public Bits getLiveDocs() {
ensureOpen();
return MultiFields.getLiveDocs(readers.get(0));
}
@Override
public Fields fields() {
ensureOpen();
return fields;
}
@Override
public synchronized Object clone() {
// doReopen calls ensureOpen
try {
return doReopen(true);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Tries to reopen the subreaders.
* <br>
* If one or more subreaders could be re-opened (i. e. subReader.reopen()
* returned a new instance != subReader), then a new ParallelReader instance
* is returned, otherwise this instance is returned.
* <p>
* A re-opened instance might share one or more subreaders with the old
* instance. Index modification operations result in undefined behavior
* when performed before the old instance is closed.
* (see {@link IndexReader#reopen()}).
* <p>
* If subreaders are shared, then the reference count of those
* readers is increased to ensure that the subreaders remain open
* until the last referring reader is closed.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@Override
public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
// doReopen calls ensureOpen
return doReopen(false);
}
protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
ensureOpen();
boolean reopened = false;
List<IndexReader> newReaders = new ArrayList<IndexReader>();
boolean success = false;
try {
for (final IndexReader oldReader : readers) {
IndexReader newReader = null;
if (doClone) {
newReader = (IndexReader) oldReader.clone();
} else {
newReader = oldReader.reopen();
}
newReaders.add(newReader);
// if at least one of the subreaders was updated we remember that
// and return a new ParallelReader
if (newReader != oldReader) {
reopened = true;
}
}
success = true;
} finally {
if (!success && reopened) {
for (int i = 0; i < newReaders.size(); i++) {
IndexReader r = newReaders.get(i);
if (r != readers.get(i)) {
try {
r.close();
} catch (IOException ignore) {
// keep going - we want to clean up as much as possible
}
}
}
}
}
if (reopened) {
List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
// TODO: maybe add a special reopen-ctor for norm-copying?
ParallelReader pr = new ParallelReader();
for (int i = 0; i < readers.size(); i++) {
IndexReader oldReader = readers.get(i);
IndexReader newReader = newReaders.get(i);
if (newReader == oldReader) {
newDecrefOnClose.add(Boolean.TRUE);
newReader.incRef();
} else {
// this is a new subreader instance, so on close() we don't
// decRef but close it
newDecrefOnClose.add(Boolean.FALSE);
}
pr.add(newReader, !storedFieldReaders.contains(oldReader));
}
pr.decrefOnClose = newDecrefOnClose;
pr.incRefReaders = incRefReaders;
return pr;
} else {
// No subreader was refreshed
return this;
}
}
@Override
public int numDocs() {
// Don't call ensureOpen() here (it could affect performance)
return numDocs;
}
@Override
public int maxDoc() {
// Don't call ensureOpen() here (it could affect performance)
return maxDoc;
}
@Override
public boolean hasDeletions() {
ensureOpen();
return hasDeletions;
}
// delete in all readers
@Override
protected void doDelete(int n) throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.deleteDocument(n);
}
hasDeletions = true;
}
// undeleteAll in all readers
@Override
protected void doUndeleteAll() throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.undeleteAll();
}
hasDeletions = false;
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws CorruptIndexException, IOException {
ensureOpen();
for (final IndexReader reader: storedFieldReaders) {
reader.document(docID, visitor);
}
}
// get all vectors
@Override
public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
ensureOpen();
ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
TermFreqVector vector = reader.getTermFreqVector(n, field);
if (vector != null)
results.add(vector);
}
return results.toArray(new TermFreqVector[results.size()]);
}
@Override
public TermFreqVector getTermFreqVector(int n, String field)
throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? null : reader.getTermFreqVector(n, field);
}
@Override
public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader != null) {
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
ensureOpen();
for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public boolean hasNorms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? false : reader.hasNorms(field);
}
@Override
public synchronized byte[] norms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader==null)
return null;
byte[] bytes = normsCache.get(field);
if (bytes != null)
return bytes;
if (!hasNorms(field))
return null;
if (normsCache.containsKey(field)) // cached omitNorms, not missing key
return null;
bytes = MultiNorms.norms(reader, field);
normsCache.put(field, bytes);
return bytes;
}
@Override
protected void doSetNorm(int n, String field, byte value)
throws CorruptIndexException, IOException {
IndexReader reader = fieldToReader.get(field);
if (reader!=null) {
synchronized(normsCache) {
normsCache.remove(field);
}
reader.doSetNorm(n, field, value);
}
}
@Override
public int docFreq(Term term) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(term.field());
return reader==null ? 0 : reader.docFreq(term);
}
@Override
public int docFreq(String field, BytesRef term) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader == null? 0 : reader.docFreq(field, term);
}
/**
* Checks recursively if all subreaders are up to date.
*/
@Override
public boolean isCurrent() throws CorruptIndexException, IOException {
ensureOpen();
for (final IndexReader reader : readers) {
if (!reader.isCurrent()) {
return false;
}
}
// all subreaders are up to date
return true;
}
/**
* Checks recursively if all subindexes are optimized
*/
@Override
public boolean isOptimized() {
ensureOpen();
for (final IndexReader reader : readers) {
if (!reader.isOptimized()) {
return false;
}
}
// all subindexes are optimized
return true;
}
/** Not implemented.
* @throws UnsupportedOperationException
*/
@Override
public long getVersion() {
throw new UnsupportedOperationException("ParallelReader does not support this method.");
}
// for testing
IndexReader[] getSubReaders() {
return readers.toArray(new IndexReader[readers.size()]);
}
@Override
protected void doCommit(Map<String,String> commitUserData) throws IOException {
for (final IndexReader reader : readers)
reader.commit(commitUserData);
}
@Override
protected synchronized void doClose() throws IOException {
for (int i = 0; i < readers.size(); i++) {
if (decrefOnClose.get(i).booleanValue()) {
readers.get(i).decRef();
} else {
readers.get(i).close();
}
}
}
@Override
public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
ensureOpen();
Set<String> fieldSet = new HashSet<String>();
for (final IndexReader reader : readers) {
Collection<String> names = reader.getFieldNames(fieldNames);
fieldSet.addAll(names);
}
return fieldSet;
}
@Override
public ReaderContext getTopReaderContext() {
ensureOpen();
return topLevelReaderContext;
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.addReaderFinishedListener(listener);
}
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.removeReaderFinishedListener(listener);
}
}
@Override
public PerDocValues perDocValues() throws IOException {
ensureOpen();
return perDocs;
}
// Single instance of this, per ParallelReader instance
private static final class ParallelPerDocs extends PerDocValues {
final TreeMap<String,IndexDocValues> fields = new TreeMap<String,IndexDocValues>();
void addField(String field, IndexReader r) throws IOException {
PerDocValues perDocs = MultiPerDocValues.getPerDocs(r);
fields.put(field, perDocs.docValues(field));
}
@Override
public void close() throws IOException {
// nothing to do here
}
@Override
public IndexDocValues docValues(String field) throws IOException {
return fields.get(field);
}
@Override
public Collection<String> fields() {
return fields.keySet();
}
}
}