| package org.apache.lucene.index; |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.FieldSelector; |
| import org.apache.lucene.document.FieldSelectorResult; |
| import org.apache.lucene.document.Fieldable; |
| import org.apache.lucene.util.MapBackedSet; |
| |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| |
| /** An IndexReader which reads multiple, parallel indexes. Each index added |
| * must have the same number of documents, but typically each contains |
| * different fields. Each document contains the union of the fields of all |
| * documents with the same document number. When searching, matches for a |
| * query term are from the first index added that has the field. |
| * |
| * <p>This is useful, e.g., with collections that have large fields which |
| * change rarely and small fields that change more frequently. The smaller |
| * fields may be re-indexed in a new index and both indexes may be searched |
| * together. |
| * |
| * <p><strong>Warning:</strong> It is up to you to make sure all indexes |
| * are created and modified the same way. For example, if you add |
| * documents to one index, you need to add the same documents in the |
| * same order to the other indexes. <em>Failure to do so will result in |
| * undefined behavior</em>. |
| */ |
| public class ParallelReader extends IndexReader { |
| private List<IndexReader> readers = new ArrayList<IndexReader>(); |
| private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close |
| boolean incRefReaders = false; |
| private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>(); |
| private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>(); |
| private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>(); |
| |
| private int maxDoc; |
| private int numDocs; |
| private boolean hasDeletions; |
| |
| /** Construct a ParallelReader. |
| * <p>Note that all subreaders are closed if this ParallelReader is closed.</p> |
| */ |
| public ParallelReader() throws IOException { this(true); } |
| |
| /** Construct a ParallelReader. |
| * @param closeSubReaders indicates whether the subreaders should be closed |
| * when this ParallelReader is closed |
| */ |
| public ParallelReader(boolean closeSubReaders) throws IOException { |
| super(); |
| this.incRefReaders = !closeSubReaders; |
| readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String toString() { |
| final StringBuilder buffer = new StringBuilder("ParallelReader("); |
| final Iterator<IndexReader> iter = readers.iterator(); |
| if (iter.hasNext()) { |
| buffer.append(iter.next()); |
| } |
| while (iter.hasNext()) { |
| buffer.append(", ").append(iter.next()); |
| } |
| buffer.append(')'); |
| return buffer.toString(); |
| } |
| |
| /** Add an IndexReader. |
| * @throws IOException if there is a low-level IO error |
| */ |
| public void add(IndexReader reader) throws IOException { |
| ensureOpen(); |
| add(reader, false); |
| } |
| |
| /** Add an IndexReader whose stored fields will not be returned. This can |
| * accelerate search when stored fields are only needed from a subset of |
| * the IndexReaders. |
| * |
| * @throws IllegalArgumentException if not all indexes contain the same number |
| * of documents |
| * @throws IllegalArgumentException if not all indexes have the same value |
| * of {@link IndexReader#maxDoc()} |
| * @throws IOException if there is a low-level IO error |
| */ |
| public void add(IndexReader reader, boolean ignoreStoredFields) |
| throws IOException { |
| |
| ensureOpen(); |
| if (readers.size() == 0) { |
| this.maxDoc = reader.maxDoc(); |
| this.numDocs = reader.numDocs(); |
| this.hasDeletions = reader.hasDeletions(); |
| } |
| |
| if (reader.maxDoc() != maxDoc) // check compatibility |
| throw new IllegalArgumentException |
| ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc()); |
| if (reader.numDocs() != numDocs) |
| throw new IllegalArgumentException |
| ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs()); |
| |
| Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL); |
| readerToFields.put(reader, fields); |
| for (final String field : fields) { // update fieldToReader map |
| if (fieldToReader.get(field) == null) |
| fieldToReader.put(field, reader); |
| } |
| |
| if (!ignoreStoredFields) |
| storedFieldReaders.add(reader); // add to storedFieldReaders |
| readers.add(reader); |
| |
| if (incRefReaders) { |
| reader.incRef(); |
| } |
| decrefOnClose.add(Boolean.valueOf(incRefReaders)); |
| } |
| |
| @Override |
| public synchronized Object clone() { |
| // doReopen calls ensureOpen |
| try { |
| return doReopen(true); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * Tries to reopen the subreaders. |
| * <br> |
| * If one or more subreaders could be re-opened (i. e. subReader.reopen() |
| * returned a new instance != subReader), then a new ParallelReader instance |
| * is returned, otherwise this instance is returned. |
| * <p> |
| * A re-opened instance might share one or more subreaders with the old |
| * instance. Index modification operations result in undefined behavior |
| * when performed before the old instance is closed. |
| * (see {@link IndexReader#reopen()}). |
| * <p> |
| * If subreaders are shared, then the reference count of those |
| * readers is increased to ensure that the subreaders remain open |
| * until the last referring reader is closed. |
| * |
| * @throws CorruptIndexException if the index is corrupt |
| * @throws IOException if there is a low-level IO error |
| */ |
| @Override |
| public synchronized IndexReader reopen() throws CorruptIndexException, IOException { |
| // doReopen calls ensureOpen |
| return doReopen(false); |
| } |
| |
| protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException { |
| ensureOpen(); |
| |
| boolean reopened = false; |
| List<IndexReader> newReaders = new ArrayList<IndexReader>(); |
| |
| boolean success = false; |
| |
| try { |
| for (final IndexReader oldReader : readers) { |
| IndexReader newReader = null; |
| if (doClone) { |
| newReader = (IndexReader) oldReader.clone(); |
| } else { |
| newReader = oldReader.reopen(); |
| } |
| newReaders.add(newReader); |
| // if at least one of the subreaders was updated we remember that |
| // and return a new ParallelReader |
| if (newReader != oldReader) { |
| reopened = true; |
| } |
| } |
| success = true; |
| } finally { |
| if (!success && reopened) { |
| for (int i = 0; i < newReaders.size(); i++) { |
| IndexReader r = newReaders.get(i); |
| if (r != readers.get(i)) { |
| try { |
| r.close(); |
| } catch (IOException ignore) { |
| // keep going - we want to clean up as much as possible |
| } |
| } |
| } |
| } |
| } |
| |
| if (reopened) { |
| List<Boolean> newDecrefOnClose = new ArrayList<Boolean>(); |
| ParallelReader pr = new ParallelReader(); |
| for (int i = 0; i < readers.size(); i++) { |
| IndexReader oldReader = readers.get(i); |
| IndexReader newReader = newReaders.get(i); |
| if (newReader == oldReader) { |
| newDecrefOnClose.add(Boolean.TRUE); |
| newReader.incRef(); |
| } else { |
| // this is a new subreader instance, so on close() we don't |
| // decRef but close it |
| newDecrefOnClose.add(Boolean.FALSE); |
| } |
| pr.add(newReader, !storedFieldReaders.contains(oldReader)); |
| } |
| pr.decrefOnClose = newDecrefOnClose; |
| pr.incRefReaders = incRefReaders; |
| return pr; |
| } else { |
| // No subreader was refreshed |
| return this; |
| } |
| } |
| |
| |
| @Override |
| public int numDocs() { |
| // Don't call ensureOpen() here (it could affect performance) |
| return numDocs; |
| } |
| |
| @Override |
| public int maxDoc() { |
| // Don't call ensureOpen() here (it could affect performance) |
| return maxDoc; |
| } |
| |
| @Override |
| public boolean hasDeletions() { |
| ensureOpen(); |
| return hasDeletions; |
| } |
| |
| // check first reader |
| @Override |
| public boolean isDeleted(int n) { |
| // Don't call ensureOpen() here (it could affect performance) |
| if (readers.size() > 0) |
| return readers.get(0).isDeleted(n); |
| return false; |
| } |
| |
| // delete in all readers |
| @Override |
| protected void doDelete(int n) throws CorruptIndexException, IOException { |
| for (final IndexReader reader : readers) { |
| reader.deleteDocument(n); |
| } |
| hasDeletions = true; |
| } |
| |
| // undeleteAll in all readers |
| @Override |
| protected void doUndeleteAll() throws CorruptIndexException, IOException { |
| for (final IndexReader reader : readers) { |
| reader.undeleteAll(); |
| } |
| hasDeletions = false; |
| } |
| |
| // append fields from storedFieldReaders |
| @Override |
| public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException { |
| ensureOpen(); |
| Document result = new Document(); |
| for (final IndexReader reader: storedFieldReaders) { |
| |
| boolean include = (fieldSelector==null); |
| if (!include) { |
| Collection<String> fields = readerToFields.get(reader); |
| for (final String field : fields) |
| if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) { |
| include = true; |
| break; |
| } |
| } |
| if (include) { |
| List<Fieldable> fields = reader.document(n, fieldSelector).getFields(); |
| for (Fieldable field : fields) { |
| result.add(field); |
| } |
| } |
| } |
| return result; |
| } |
| |
| // get all vectors |
| @Override |
| public TermFreqVector[] getTermFreqVectors(int n) throws IOException { |
| ensureOpen(); |
| ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>(); |
| for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) { |
| |
| String field = e.getKey(); |
| IndexReader reader = e.getValue(); |
| TermFreqVector vector = reader.getTermFreqVector(n, field); |
| if (vector != null) |
| results.add(vector); |
| } |
| return results.toArray(new TermFreqVector[results.size()]); |
| } |
| |
| @Override |
| public TermFreqVector getTermFreqVector(int n, String field) |
| throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(field); |
| return reader==null ? null : reader.getTermFreqVector(n, field); |
| } |
| |
| |
| @Override |
| public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(field); |
| if (reader != null) { |
| reader.getTermFreqVector(docNumber, field, mapper); |
| } |
| } |
| |
| @Override |
| public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException { |
| ensureOpen(); |
| |
| for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) { |
| |
| String field = e.getKey(); |
| IndexReader reader = e.getValue(); |
| reader.getTermFreqVector(docNumber, field, mapper); |
| } |
| |
| } |
| |
| @Override |
| public boolean hasNorms(String field) throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(field); |
| return reader==null ? false : reader.hasNorms(field); |
| } |
| |
| @Override |
| public byte[] norms(String field) throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(field); |
| return reader==null ? null : reader.norms(field); |
| } |
| |
| @Override |
| public void norms(String field, byte[] result, int offset) |
| throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(field); |
| if (reader!=null) |
| reader.norms(field, result, offset); |
| } |
| |
| @Override |
| protected void doSetNorm(int n, String field, byte value) |
| throws CorruptIndexException, IOException { |
| IndexReader reader = fieldToReader.get(field); |
| if (reader!=null) |
| reader.doSetNorm(n, field, value); |
| } |
| |
| @Override |
| public TermEnum terms() throws IOException { |
| ensureOpen(); |
| return new ParallelTermEnum(); |
| } |
| |
| @Override |
| public TermEnum terms(Term term) throws IOException { |
| ensureOpen(); |
| return new ParallelTermEnum(term); |
| } |
| |
| @Override |
| public int docFreq(Term term) throws IOException { |
| ensureOpen(); |
| IndexReader reader = fieldToReader.get(term.field()); |
| return reader==null ? 0 : reader.docFreq(term); |
| } |
| |
| @Override |
| public TermDocs termDocs(Term term) throws IOException { |
| ensureOpen(); |
| return new ParallelTermDocs(term); |
| } |
| |
| @Override |
| public TermDocs termDocs() throws IOException { |
| ensureOpen(); |
| return new ParallelTermDocs(); |
| } |
| |
| @Override |
| public TermPositions termPositions(Term term) throws IOException { |
| ensureOpen(); |
| return new ParallelTermPositions(term); |
| } |
| |
| @Override |
| public TermPositions termPositions() throws IOException { |
| ensureOpen(); |
| return new ParallelTermPositions(); |
| } |
| |
| /** |
| * Checks recursively if all subreaders are up to date. |
| */ |
| @Override |
| public boolean isCurrent() throws CorruptIndexException, IOException { |
| ensureOpen(); |
| for (final IndexReader reader : readers) { |
| if (!reader.isCurrent()) { |
| return false; |
| } |
| } |
| |
| // all subreaders are up to date |
| return true; |
| } |
| |
| /** |
| * Checks recursively if all subindexes are optimized |
| */ |
| @Override |
| public boolean isOptimized() { |
| ensureOpen(); |
| for (final IndexReader reader : readers) { |
| if (!reader.isOptimized()) { |
| return false; |
| } |
| } |
| |
| // all subindexes are optimized |
| return true; |
| } |
| |
| |
| /** Not implemented. |
| * @throws UnsupportedOperationException |
| */ |
| @Override |
| public long getVersion() { |
| throw new UnsupportedOperationException("ParallelReader does not support this method."); |
| } |
| |
| // for testing |
| IndexReader[] getSubReaders() { |
| return readers.toArray(new IndexReader[readers.size()]); |
| } |
| |
| @Override |
| protected void doCommit(Map<String,String> commitUserData) throws IOException { |
| for (final IndexReader reader : readers) |
| reader.commit(commitUserData); |
| } |
| |
| @Override |
| protected synchronized void doClose() throws IOException { |
| for (int i = 0; i < readers.size(); i++) { |
| if (decrefOnClose.get(i).booleanValue()) { |
| readers.get(i).decRef(); |
| } else { |
| readers.get(i).close(); |
| } |
| } |
| } |
| |
| @Override |
| public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) { |
| ensureOpen(); |
| Set<String> fieldSet = new HashSet<String>(); |
| for (final IndexReader reader : readers) { |
| Collection<String> names = reader.getFieldNames(fieldNames); |
| fieldSet.addAll(names); |
| } |
| return fieldSet; |
| } |
| |
| private class ParallelTermEnum extends TermEnum { |
| private String field; |
| private Iterator<String> fieldIterator; |
| private TermEnum termEnum; |
| |
| public ParallelTermEnum() throws IOException { |
| try { |
| field = fieldToReader.firstKey(); |
| } catch(NoSuchElementException e) { |
| // No fields, so keep field == null, termEnum == null |
| return; |
| } |
| if (field != null) |
| termEnum = fieldToReader.get(field).terms(); |
| } |
| |
| public ParallelTermEnum(Term term) throws IOException { |
| field = term.field(); |
| IndexReader reader = fieldToReader.get(field); |
| if (reader!=null) |
| termEnum = reader.terms(term); |
| } |
| |
| @Override |
| public boolean next() throws IOException { |
| if (termEnum==null) |
| return false; |
| |
| // another term in this field? |
| if (termEnum.next() && termEnum.term().field()==field) |
| return true; // yes, keep going |
| |
| termEnum.close(); // close old termEnum |
| |
| // find the next field with terms, if any |
| if (fieldIterator==null) { |
| fieldIterator = fieldToReader.tailMap(field).keySet().iterator(); |
| fieldIterator.next(); // Skip field to get next one |
| } |
| while (fieldIterator.hasNext()) { |
| field = fieldIterator.next(); |
| termEnum = fieldToReader.get(field).terms(new Term(field)); |
| Term term = termEnum.term(); |
| if (term!=null && term.field()==field) |
| return true; |
| else |
| termEnum.close(); |
| } |
| |
| return false; // no more fields |
| } |
| |
| @Override |
| public Term term() { |
| if (termEnum==null) |
| return null; |
| |
| return termEnum.term(); |
| } |
| |
| @Override |
| public int docFreq() { |
| if (termEnum==null) |
| return 0; |
| |
| return termEnum.docFreq(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (termEnum!=null) |
| termEnum.close(); |
| } |
| } |
| |
| // wrap a TermDocs in order to support seek(Term) |
| private class ParallelTermDocs implements TermDocs { |
| protected TermDocs termDocs; |
| |
| public ParallelTermDocs() {} |
| public ParallelTermDocs(Term term) throws IOException { |
| if (term == null) |
| termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null); |
| else |
| seek(term); |
| } |
| |
| public int doc() { return termDocs.doc(); } |
| public int freq() { return termDocs.freq(); } |
| |
| public void seek(Term term) throws IOException { |
| IndexReader reader = fieldToReader.get(term.field()); |
| termDocs = reader!=null ? reader.termDocs(term) : null; |
| } |
| |
| public void seek(TermEnum termEnum) throws IOException { |
| seek(termEnum.term()); |
| } |
| |
| public boolean next() throws IOException { |
| if (termDocs==null) |
| return false; |
| |
| return termDocs.next(); |
| } |
| |
| public int read(final int[] docs, final int[] freqs) throws IOException { |
| if (termDocs==null) |
| return 0; |
| |
| return termDocs.read(docs, freqs); |
| } |
| |
| public boolean skipTo(int target) throws IOException { |
| if (termDocs==null) |
| return false; |
| |
| return termDocs.skipTo(target); |
| } |
| |
| public void close() throws IOException { |
| if (termDocs!=null) |
| termDocs.close(); |
| } |
| |
| } |
| |
| private class ParallelTermPositions |
| extends ParallelTermDocs implements TermPositions { |
| |
| public ParallelTermPositions() {} |
| public ParallelTermPositions(Term term) throws IOException { seek(term); } |
| |
| @Override |
| public void seek(Term term) throws IOException { |
| IndexReader reader = fieldToReader.get(term.field()); |
| termDocs = reader!=null ? reader.termPositions(term) : null; |
| } |
| |
| public int nextPosition() throws IOException { |
| // It is an error to call this if there is no next position, e.g. if termDocs==null |
| return ((TermPositions)termDocs).nextPosition(); |
| } |
| |
| public int getPayloadLength() { |
| return ((TermPositions)termDocs).getPayloadLength(); |
| } |
| |
| public byte[] getPayload(byte[] data, int offset) throws IOException { |
| return ((TermPositions)termDocs).getPayload(data, offset); |
| } |
| |
| |
| // TODO: Remove warning after API has been finalized |
| public boolean isPayloadAvailable() { |
| return ((TermPositions) termDocs).isPayloadAvailable(); |
| } |
| } |
| |
| @Override |
| public void addReaderFinishedListener(ReaderFinishedListener listener) { |
| super.addReaderFinishedListener(listener); |
| for (IndexReader reader : readers) { |
| reader.addReaderFinishedListener(listener); |
| } |
| } |
| |
| @Override |
| public void removeReaderFinishedListener(ReaderFinishedListener listener) { |
| super.removeReaderFinishedListener(listener); |
| for (IndexReader reader : readers) { |
| reader.removeReaderFinishedListener(listener); |
| } |
| } |
| } |
| |
| |
| |
| |
| |