blob: c8bdbaf0a680d8e38c0c21eddcb134d2e90c6896 [file] [log] [blame]
Index: lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
--- lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Thu Dec 09 10:10:12 2010 -0500
@@ -17,153 +17,415 @@
* limitations under the License.
*/
+import java.io.IOException;
+import java.io.PrintStream;
import java.util.HashMap;
+import java.util.Date;
+import java.util.Map.Entry;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
-/** Holds buffered deletes, by docID, term or query. We
- * hold two instances of this class: one for the deletes
- * prior to the last flush, the other for deletes after
- * the last flush. This is so if we need to abort
- * (discard all buffered docs) we can also discard the
- * buffered deletes yet keep the deletes done during
- * previously flushed segments. */
+/** Holds a {@link SegmentDeletes} for each segment in the
+ * index. */
+
class BufferedDeletes {
- int numTerms;
- Map<Term,Num> terms;
- Map<Query,Integer> queries = new HashMap<Query,Integer>();
- List<Integer> docIDs = new ArrayList<Integer>();
- long bytesUsed;
- private final boolean doTermSort;
- public BufferedDeletes(boolean doTermSort) {
- this.doTermSort = doTermSort;
- if (doTermSort) {
- terms = new TreeMap<Term,Num>();
+ // Deletes for all flushed/merged segments:
+ private final Map<SegmentInfo,SegmentDeletes> deletesMap = new HashMap<SegmentInfo,SegmentDeletes>();
+
+ // used only by assert
+ private Term lastDeleteTerm;
+
+ private PrintStream infoStream;
+ private final AtomicLong bytesUsed = new AtomicLong();
+ private final AtomicInteger numTerms = new AtomicInteger();
+ private final int messageID;
+
+ public BufferedDeletes(int messageID) {
+ this.messageID = messageID;
+ }
+
+ private synchronized void message(String message) {
+ if (infoStream != null) {
+ infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: BD " + message);
+ }
+ }
+
+ public synchronized void setInfoStream(PrintStream infoStream) {
+ this.infoStream = infoStream;
+ }
+
+ public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info) {
+ pushDeletes(newDeletes, info, false);
+ }
+
+ // Moves all pending deletes onto the provided segment,
+ // then clears the pending deletes
+ public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info, boolean noLimit) {
+ assert newDeletes.any();
+ numTerms.addAndGet(newDeletes.numTermDeletes.get());
+
+ if (!noLimit) {
+ assert !deletesMap.containsKey(info);
+ assert info != null;
+ deletesMap.put(info, newDeletes);
+ bytesUsed.addAndGet(newDeletes.bytesUsed.get());
} else {
- terms = new HashMap<Term,Num>();
+ final SegmentDeletes deletes = getDeletes(info);
+ bytesUsed.addAndGet(-deletes.bytesUsed.get());
+ deletes.update(newDeletes, noLimit);
+ bytesUsed.addAndGet(deletes.bytesUsed.get());
+ }
+ if (infoStream != null) {
+ message("push deletes seg=" + info + " dels=" + getDeletes(info));
+ }
+ assert checkDeleteStats();
+ }
+
+ public synchronized void clear() {
+ deletesMap.clear();
+ numTerms.set(0);
+ bytesUsed.set(0);
+ }
+
+ synchronized boolean any() {
+ return bytesUsed.get() != 0;
+ }
+
+ public int numTerms() {
+ return numTerms.get();
+ }
+
+ public long bytesUsed() {
+ return bytesUsed.get();
+ }
+
+ // IW calls this on finishing a merge. While the merge
+ // was running, it's possible new deletes were pushed onto
+ // our last (and only our last) segment. In this case we
+ // must carry forward those deletes onto the merged
+ // segment.
+ synchronized void commitMerge(MergePolicy.OneMerge merge) {
+ assert checkDeleteStats();
+ if (infoStream != null) {
+ message("commitMerge merge.info=" + merge.info + " merge.segments=" + merge.segments);
+ }
+ final SegmentInfo lastInfo = merge.segments.lastElement();
+ final SegmentDeletes lastDeletes = deletesMap.get(lastInfo);
+ if (lastDeletes != null) {
+ deletesMap.remove(lastInfo);
+ assert !deletesMap.containsKey(merge.info);
+ deletesMap.put(merge.info, lastDeletes);
+ // don't need to update numTerms/bytesUsed since we
+ // are just moving the deletes from one info to
+ // another
+ if (infoStream != null) {
+ message("commitMerge done: new deletions=" + lastDeletes);
+ }
+ } else if (infoStream != null) {
+ message("commitMerge done: no new deletions");
+ }
+ assert !anyDeletes(merge.segments.range(0, merge.segments.size()-1));
+ assert checkDeleteStats();
+ }
+
+ synchronized void clear(SegmentDeletes deletes) {
+ deletes.clear();
+ }
+
+ public synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos segmentInfos, SegmentInfos applyInfos) throws IOException {
+ if (!any()) {
+ return false;
+ }
+ final long t0 = System.currentTimeMillis();
+
+ if (infoStream != null) {
+ message("applyDeletes: applyInfos=" + applyInfos + "; index=" + segmentInfos);
+ }
+
+ assert checkDeleteStats();
+
+ assert applyInfos.size() > 0;
+
+ boolean any = false;
+
+ final SegmentInfo lastApplyInfo = applyInfos.lastElement();
+ final int lastIdx = segmentInfos.indexOf(lastApplyInfo);
+
+ final SegmentInfo firstInfo = applyInfos.firstElement();
+ final int firstIdx = segmentInfos.indexOf(firstInfo);
+
+ // applyInfos must be a slice of segmentInfos
+ assert lastIdx - firstIdx + 1 == applyInfos.size();
+
+ // iterate over all segment infos backwards
+ // coalesceing deletes along the way
+ // when we're at or below the last of the
+ // segments to apply to, start applying the deletes
+ // we traverse up to the first apply infos
+ SegmentDeletes coalescedDeletes = null;
+ boolean hasDeletes = false;
+ for (int segIdx=segmentInfos.size()-1; segIdx >= firstIdx; segIdx--) {
+ final SegmentInfo info = segmentInfos.info(segIdx);
+ final SegmentDeletes deletes = deletesMap.get(info);
+ assert deletes == null || deletes.any();
+
+ if (deletes == null && coalescedDeletes == null) {
+ continue;
+ }
+
+ if (infoStream != null) {
+ message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]");
+ }
+
+ hasDeletes |= deletes != null;
+
+ if (segIdx <= lastIdx && hasDeletes) {
+
+ any |= applyDeletes(readerPool, info, coalescedDeletes, deletes);
+
+ if (deletes != null) {
+ // we've applied doc ids, and they're only applied
+ // on the current segment
+ bytesUsed.addAndGet(-deletes.docIDs.size() * SegmentDeletes.BYTES_PER_DEL_DOCID);
+ deletes.clearDocIDs();
+ }
+ }
+
+ // now coalesce at the max limit
+ if (deletes != null) {
+ if (coalescedDeletes == null) {
+ coalescedDeletes = new SegmentDeletes();
+ }
+ // TODO: we could make this single pass (coalesce as
+ // we apply the deletes
+ coalescedDeletes.update(deletes, true);
+ }
+ }
+
+ // move all deletes to segment just before our merge.
+ if (firstIdx > 0) {
+
+ SegmentDeletes mergedDeletes = null;
+ // TODO: we could also make this single pass
+ for (SegmentInfo info : applyInfos) {
+ final SegmentDeletes deletes = deletesMap.get(info);
+ if (deletes != null) {
+ assert deletes.any();
+ if (mergedDeletes == null) {
+ mergedDeletes = getDeletes(segmentInfos.info(firstIdx-1));
+ numTerms.addAndGet(-mergedDeletes.numTermDeletes.get());
+ bytesUsed.addAndGet(-mergedDeletes.bytesUsed.get());
+ }
+
+ mergedDeletes.update(deletes, true);
+ }
+ }
+
+ if (mergedDeletes != null) {
+ numTerms.addAndGet(mergedDeletes.numTermDeletes.get());
+ bytesUsed.addAndGet(mergedDeletes.bytesUsed.get());
+ }
+
+ if (infoStream != null) {
+ if (mergedDeletes != null) {
+ message("applyDeletes: merge all deletes into seg=" + segmentInfos.info(firstIdx-1) + ": " + mergedDeletes);
+ } else {
+ message("applyDeletes: no deletes to merge");
+ }
+ }
+ } else {
+ // We drop the deletes in this case, because we've
+ // applied them to segment infos starting w/ the first
+ // segment. There are no prior segments so there's no
+ // reason to keep them around. When the applyInfos ==
+ // segmentInfos this means all deletes have been
+ // removed:
+ }
+ remove(applyInfos);
+
+ assert checkDeleteStats();
+ assert applyInfos != segmentInfos || !any();
+
+ if (infoStream != null) {
+ message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+ }
+ return any;
+ }
+
+ private synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool,
+ SegmentInfo info,
+ SegmentDeletes coalescedDeletes,
+ SegmentDeletes segmentDeletes) throws IOException {
+ assert readerPool.infoIsLive(info);
+
+ assert coalescedDeletes == null || coalescedDeletes.docIDs.size() == 0;
+
+ boolean any = false;
+
+ // Lock order: IW -> BD -> RP
+ SegmentReader reader = readerPool.get(info, false);
+ try {
+ if (coalescedDeletes != null) {
+ any |= applyDeletes(coalescedDeletes, reader);
+ }
+ if (segmentDeletes != null) {
+ any |= applyDeletes(segmentDeletes, reader);
+ }
+ } finally {
+ readerPool.release(reader);
+ }
+ return any;
+ }
+
+ private synchronized boolean applyDeletes(SegmentDeletes deletes, SegmentReader reader) throws IOException {
+ boolean any = false;
+
+ assert checkDeleteTerm(null);
+
+ if (deletes.terms.size() > 0) {
+ Fields fields = reader.fields();
+ if (fields == null) {
+ // This reader has no postings
+ return false;
+ }
+
+ TermsEnum termsEnum = null;
+
+ String currentField = null;
+ DocsEnum docs = null;
+
+ for (Entry<Term,Integer> entry: deletes.terms.entrySet()) {
+ Term term = entry.getKey();
+ // Since we visit terms sorted, we gain performance
+ // by re-using the same TermsEnum and seeking only
+ // forwards
+ if (term.field() != currentField) {
+ assert currentField == null || currentField.compareTo(term.field()) < 0;
+ currentField = term.field();
+ Terms terms = fields.terms(currentField);
+ if (terms != null) {
+ termsEnum = terms.iterator();
+ } else {
+ termsEnum = null;
+ }
+ }
+
+ if (termsEnum == null) {
+ continue;
+ }
+ assert checkDeleteTerm(term);
+
+ if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
+ DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
+
+ if (docsEnum != null) {
+ docs = docsEnum;
+ final int limit = entry.getValue();
+ while (true) {
+ final int docID = docs.nextDoc();
+ if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
+ break;
+ }
+ reader.deleteDocument(docID);
+ any = true;
+ }
+ }
+ }
+ }
+ }
+
+ // Delete by docID
+ for (Integer docIdInt : deletes.docIDs) {
+ int docID = docIdInt.intValue();
+ reader.deleteDocument(docID);
+ any = true;
+ }
+
+ // Delete by query
+ if (deletes.queries.size() > 0) {
+ IndexSearcher searcher = new IndexSearcher(reader);
+ try {
+ for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
+ Query query = entry.getKey();
+ int limit = entry.getValue().intValue();
+ Weight weight = query.weight(searcher);
+ Scorer scorer = weight.scorer(reader, true, false);
+ if (scorer != null) {
+ while(true) {
+ int doc = scorer.nextDoc();
+ if (doc >= limit)
+ break;
+ reader.deleteDocument(doc);
+ any = true;
+ }
+ }
+ }
+ } finally {
+ searcher.close();
+ }
+ }
+ return any;
+ }
+
+ public synchronized SegmentDeletes getDeletes(SegmentInfo info) {
+ SegmentDeletes deletes = deletesMap.get(info);
+ if (deletes == null) {
+ deletes = new SegmentDeletes();
+ deletesMap.put(info, deletes);
+ }
+ return deletes;
+ }
+
+ public synchronized void remove(SegmentInfos infos) {
+ assert infos.size() > 0;
+ for (SegmentInfo info : infos) {
+ SegmentDeletes deletes = deletesMap.get(info);
+ if (deletes != null) {
+ bytesUsed.addAndGet(-deletes.bytesUsed.get());
+ assert bytesUsed.get() >= 0: "bytesUsed=" + bytesUsed;
+ numTerms.addAndGet(-deletes.numTermDeletes.get());
+ assert numTerms.get() >= 0: "numTerms=" + numTerms;
+ deletesMap.remove(info);
+ }
}
}
- // Number of documents a delete term applies to.
- final static class Num {
- private int num;
-
- Num(int num) {
- this.num = num;
+ // used only by assert
+ private boolean anyDeletes(SegmentInfos infos) {
+ for(SegmentInfo info : infos) {
+ if (deletesMap.containsKey(info)) {
+ return true;
+ }
}
-
- int getNum() {
- return num;
- }
-
- void setNum(int num) {
- // Only record the new number if it's greater than the
- // current one. This is important because if multiple
- // threads are replacing the same doc at nearly the
- // same time, it's possible that one thread that got a
- // higher docID is scheduled before the other
- // threads.
- if (num > this.num)
- this.num = num;
- }
+ return false;
}
- int size() {
- // We use numTerms not terms.size() intentionally, so
- // that deletes by the same term multiple times "count",
- // ie if you ask to flush every 1000 deletes then even
- // dup'd terms are counted towards that 1000
- return numTerms + queries.size() + docIDs.size();
+ // used only by assert
+ private boolean checkDeleteTerm(Term term) {
+ if (term != null) {
+ assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
+ }
+ lastDeleteTerm = term;
+ return true;
}
-
- void update(BufferedDeletes in) {
- numTerms += in.numTerms;
- bytesUsed += in.bytesUsed;
- terms.putAll(in.terms);
- queries.putAll(in.queries);
- docIDs.addAll(in.docIDs);
- in.clear();
+
+ // only for assert
+ private boolean checkDeleteStats() {
+ int numTerms2 = 0;
+ long bytesUsed2 = 0;
+ for(SegmentDeletes deletes : deletesMap.values()) {
+ numTerms2 += deletes.numTermDeletes.get();
+ bytesUsed2 += deletes.bytesUsed.get();
+ }
+ assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+ assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+ return true;
}
-
- void clear() {
- terms.clear();
- queries.clear();
- docIDs.clear();
- numTerms = 0;
- bytesUsed = 0;
- }
-
- void addBytesUsed(long b) {
- bytesUsed += b;
- }
-
- boolean any() {
- return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
- }
-
- // Remaps all buffered deletes based on a completed
- // merge
- synchronized void remap(MergeDocIDRemapper mapper,
- SegmentInfos infos,
- int[][] docMaps,
- int[] delCounts,
- MergePolicy.OneMerge merge,
- int mergeDocCount) {
-
- final Map<Term,Num> newDeleteTerms;
-
- // Remap delete-by-term
- if (terms.size() > 0) {
- if (doTermSort) {
- newDeleteTerms = new TreeMap<Term,Num>();
- } else {
- newDeleteTerms = new HashMap<Term,Num>();
- }
- for(Entry<Term,Num> entry : terms.entrySet()) {
- Num num = entry.getValue();
- newDeleteTerms.put(entry.getKey(),
- new Num(mapper.remap(num.getNum())));
- }
- } else
- newDeleteTerms = null;
-
-
- // Remap delete-by-docID
- final List<Integer> newDeleteDocIDs;
-
- if (docIDs.size() > 0) {
- newDeleteDocIDs = new ArrayList<Integer>(docIDs.size());
- for (Integer num : docIDs) {
- newDeleteDocIDs.add(Integer.valueOf(mapper.remap(num.intValue())));
- }
- } else
- newDeleteDocIDs = null;
-
-
- // Remap delete-by-query
- final HashMap<Query,Integer> newDeleteQueries;
-
- if (queries.size() > 0) {
- newDeleteQueries = new HashMap<Query, Integer>(queries.size());
- for(Entry<Query,Integer> entry: queries.entrySet()) {
- Integer num = entry.getValue();
- newDeleteQueries.put(entry.getKey(),
- Integer.valueOf(mapper.remap(num.intValue())));
- }
- } else
- newDeleteQueries = null;
-
- if (newDeleteTerms != null)
- terms = newDeleteTerms;
- if (newDeleteDocIDs != null)
- docIDs = newDeleteDocIDs;
- if (newDeleteQueries != null)
- queries = newDeleteQueries;
- }
-}
\ No newline at end of file
+}
Index: lucene/src/java/org/apache/lucene/index/CompoundFileReader.java
--- lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Thu Dec 09 10:10:12 2010 -0500
@@ -160,7 +160,7 @@
id = IndexFileNames.stripSegmentName(id);
FileEntry entry = entries.get(id);
if (entry == null)
- throw new IOException("No sub-file with id " + id + " found");
+ throw new IOException("No sub-file with id " + id + " found (files: " + entries.keySet() + ")");
return new CSIndexInput(stream, entry.offset, entry.length, readBufferSize);
}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Dec 09 10:10:12 2010 -0500
@@ -23,24 +23,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Map;
import java.util.HashSet;
import java.util.List;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
-import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Similarity;
-import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.Constants;
import org.apache.lucene.util.RecyclingByteBlockAllocator;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.RamUsageEstimator;
@@ -115,7 +109,6 @@
*/
final class DocumentsWriter {
-
final AtomicLong bytesUsed = new AtomicLong(0);
IndexWriter writer;
Directory directory;
@@ -133,9 +126,6 @@
private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
- private int pauseThreads; // Non-zero when we need all threads to
- // pause (eg to flush)
- boolean flushPending; // True when a thread has decided to flush
boolean bufferIsFull; // True when it's time to write segment
private boolean aborting; // True if an abort is pending
@@ -151,6 +141,9 @@
List<String> newFiles;
+ // Deletes for our still-in-RAM (to be flushed next) segment
+ private SegmentDeletes pendingDeletes = new SegmentDeletes();
+
static class DocState {
DocumentsWriter docWriter;
Analyzer analyzer;
@@ -276,18 +269,6 @@
final DocConsumer consumer;
- // Deletes done after the last flush; these are discarded
- // on abort
- private BufferedDeletes deletesInRAM = new BufferedDeletes(false);
-
- // Deletes done before the last flush; these are still
- // kept on abort
- private BufferedDeletes deletesFlushed = new BufferedDeletes(true);
-
- // The max number of delete terms that can be buffered before
- // they must be flushed to disk.
- private int maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
-
// How much RAM we can use before flushing. This is 0 if
// we are flushing by doc count instead.
private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
@@ -302,28 +283,20 @@
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
- private int flushedDocCount; // How many docs already flushed to index
-
- synchronized void updateFlushedDocCount(int n) {
- flushedDocCount += n;
- }
- synchronized int getFlushedDocCount() {
- return flushedDocCount;
- }
- synchronized void setFlushedDocCount(int n) {
- flushedDocCount = n;
- }
-
private boolean closed;
private final FieldInfos fieldInfos;
- DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos) throws IOException {
+ private final BufferedDeletes bufferedDeletes;
+ private final IndexWriter.FlushControl flushControl;
+
+ DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
this.directory = directory;
this.writer = writer;
this.similarity = writer.getConfig().getSimilarity();
this.maxThreadStates = maxThreadStates;
- flushedDocCount = writer.maxDoc();
this.fieldInfos = fieldInfos;
+ this.bufferedDeletes = bufferedDeletes;
+ flushControl = writer.flushControl;
consumer = indexingChain.getChain(this);
if (consumer instanceof DocFieldProcessor) {
@@ -331,6 +304,57 @@
}
}
+ // Buffer a specific docID for deletion. Currently only
+ // used when we hit a exception when adding a document
+ synchronized void deleteDocID(int docIDUpto) {
+ pendingDeletes.addDocID(docIDUpto);
+ // NOTE: we do not trigger flush here. This is
+ // potentially a RAM leak, if you have an app that tries
+ // to add docs but every single doc always hits a
+ // non-aborting exception. Allowing a flush here gets
+ // very messy because we are only invoked when handling
+ // exceptions so to do this properly, while handling an
+ // exception we'd have to go off and flush new deletes
+ // which is risky (likely would hit some other
+ // confounding exception).
+ }
+
+ boolean deleteQueries(Query... queries) {
+ final boolean doFlush = flushControl.waitUpdate(0, queries.length);
+ synchronized(this) {
+ for (Query query : queries) {
+ pendingDeletes.addQuery(query, numDocsInRAM);
+ }
+ }
+ return doFlush;
+ }
+
+ boolean deleteQuery(Query query) {
+ final boolean doFlush = flushControl.waitUpdate(0, 1);
+ synchronized(this) {
+ pendingDeletes.addQuery(query, numDocsInRAM);
+ }
+ return doFlush;
+ }
+
+ boolean deleteTerms(Term... terms) {
+ final boolean doFlush = flushControl.waitUpdate(0, terms.length);
+ synchronized(this) {
+ for (Term term : terms) {
+ pendingDeletes.addTerm(term, numDocsInRAM);
+ }
+ }
+ return doFlush;
+ }
+
+ boolean deleteTerm(Term term, boolean skipWait) {
+ final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
+ synchronized(this) {
+ pendingDeletes.addTerm(term, numDocsInRAM);
+ }
+ return doFlush;
+ }
+
public FieldInfos getFieldInfos() {
return fieldInfos;
}
@@ -395,12 +419,12 @@
}
/** Get current segment name we are writing. */
- String getSegment() {
+ synchronized String getSegment() {
return segment;
}
/** Returns how many docs are currently buffered in RAM. */
- int getNumDocsInRAM() {
+ synchronized int getNumDocsInRAM() {
return numDocsInRAM;
}
@@ -412,46 +436,86 @@
/** Returns the doc offset into the shared doc store for
* the current buffered docs. */
- int getDocStoreOffset() {
+ synchronized int getDocStoreOffset() {
return docStoreOffset;
}
- /** Closes the current open doc stores an returns the doc
- * store segment name. This returns null if there are *
- * no buffered documents. */
- synchronized String closeDocStore() throws IOException {
+ /** Closes the current open doc stores an sets the
+ * docStoreSegment and docStoreUseCFS on the provided
+ * SegmentInfo. */
+ synchronized void closeDocStore(SegmentWriteState flushState, IndexWriter writer, IndexFileDeleter deleter, SegmentInfo newSegment, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
- assert allThreadsIdle();
+ final boolean isSeparate = numDocsInRAM == 0 || !segment.equals(docStoreSegment);
- if (infoStream != null)
- message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
-
- boolean success = false;
+ assert docStoreSegment != null;
- try {
- initFlushState(true);
- closedFiles.clear();
+ if (infoStream != null) {
+ message("closeDocStore: files=" + openFiles + "; segment=" + docStoreSegment + "; docStoreOffset=" + docStoreOffset + "; numDocsInStore=" + numDocsInStore + "; isSeparate=" + isSeparate);
+ }
- consumer.closeDocStore(flushState);
- assert 0 == openFiles.size();
+ closedFiles.clear();
+ consumer.closeDocStore(flushState);
+ flushState.numDocsInStore = 0;
+ assert 0 == openFiles.size();
- String s = docStoreSegment;
- docStoreSegment = null;
- docStoreOffset = 0;
- numDocsInStore = 0;
- success = true;
- return s;
- } finally {
- if (!success) {
- abort();
+ if (isSeparate) {
+ flushState.flushedFiles.clear();
+
+ if (mergePolicy.useCompoundDocStore(segmentInfos)) {
+
+ final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
+
+ if (infoStream != null) {
+ message("closeDocStore: create compound file " + compoundFileName);
+ }
+
+ boolean success = false;
+ try {
+
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+ for (final String file : closedFiles) {
+ cfsWriter.addFile(file);
+ }
+
+ // Perform the merge
+ cfsWriter.close();
+
+ success = true;
+ } finally {
+ if (!success) {
+ deleter.deleteFile(compoundFileName);
+ }
+ }
+
+ // In case the files we just merged into a CFS were
+ // not registered w/ IFD:
+ deleter.deleteNewFiles(closedFiles);
+
+ final int numSegments = segmentInfos.size();
+ for(int i=0;i<numSegments;i++) {
+ SegmentInfo si = segmentInfos.info(i);
+ if (si.getDocStoreOffset() != -1 &&
+ si.getDocStoreSegment().equals(docStoreSegment)) {
+ si.setDocStoreIsCompoundFile(true);
+ }
+ }
+
+ newSegment.setDocStoreIsCompoundFile(true);
+ if (infoStream != null) {
+ message("closeDocStore: after compound file index=" + segmentInfos);
+ }
+
+ writer.checkpoint();
}
}
+
+ docStoreSegment = null;
+ docStoreOffset = 0;
+ numDocsInStore = 0;
}
private Collection<String> abortedFiles; // List of files that were written before last abort()
- private SegmentWriteState flushState;
-
Collection<String> abortedFiles() {
return abortedFiles;
}
@@ -471,11 +535,6 @@
return (List<String>) ((ArrayList<String>) openFiles).clone();
}
- @SuppressWarnings("unchecked")
- synchronized List<String> closedFiles() {
- return (List<String>) ((ArrayList<String>) closedFiles).clone();
- }
-
synchronized void addOpenFile(String name) {
assert !openFiles.contains(name);
openFiles.add(name);
@@ -488,6 +547,9 @@
}
synchronized void setAborting() {
+ if (infoStream != null) {
+ message("setAborting");
+ }
aborting = true;
}
@@ -497,61 +559,62 @@
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
+ if (infoStream != null) {
+ message("docWriter: abort");
+ }
+
+ boolean success = false;
+
try {
- if (infoStream != null) {
- message("docWriter: now abort");
- }
// Forcefully remove waiting ThreadStates from line
waitQueue.abort();
// Wait for all other threads to finish with
// DocumentsWriter:
- pauseAllThreads();
+ waitIdle();
+
+ if (infoStream != null) {
+ message("docWriter: abort waitIdle done");
+ }
+
+ assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
+
+ waitQueue.waitingBytes = 0;
try {
+ abortedFiles = openFiles();
+ } catch (Throwable t) {
+ abortedFiles = null;
+ }
- assert 0 == waitQueue.numWaiting;
+ pendingDeletes.clear();
+
+ openFiles.clear();
- waitQueue.waitingBytes = 0;
-
+ for(int i=0;i<threadStates.length;i++)
try {
- abortedFiles = openFiles();
- } catch (Throwable t) {
- abortedFiles = null;
- }
-
- deletesInRAM.clear();
- deletesFlushed.clear();
-
- openFiles.clear();
-
- for(int i=0;i<threadStates.length;i++)
- try {
- threadStates[i].consumer.abort();
- } catch (Throwable t) {
- }
-
- try {
- consumer.abort();
+ threadStates[i].consumer.abort();
} catch (Throwable t) {
}
- docStoreSegment = null;
- numDocsInStore = 0;
- docStoreOffset = 0;
+ try {
+ consumer.abort();
+ } catch (Throwable t) {
+ }
- // Reset all postings data
- doAfterFlush();
+ docStoreSegment = null;
+ numDocsInStore = 0;
+ docStoreOffset = 0;
- } finally {
- resumeAllThreads();
- }
+ // Reset all postings data
+ doAfterFlush();
+ success = true;
} finally {
aborting = false;
notifyAll();
if (infoStream != null) {
- message("docWriter: done abort; abortedFiles=" + abortedFiles);
+ message("docWriter: done abort; abortedFiles=" + abortedFiles + " success=" + success);
}
}
}
@@ -566,32 +629,10 @@
numDocsInRAM = 0;
nextDocID = 0;
bufferIsFull = false;
- flushPending = false;
for(int i=0;i<threadStates.length;i++)
threadStates[i].doAfterFlush();
}
- // Returns true if an abort is in progress
- synchronized boolean pauseAllThreads() {
- pauseThreads++;
- while(!allThreadsIdle()) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- return aborting;
- }
-
- synchronized void resumeAllThreads() {
- pauseThreads--;
- assert pauseThreads >= 0;
- if (0 == pauseThreads)
- notifyAll();
- }
-
private synchronized boolean allThreadsIdle() {
for(int i=0;i<threadStates.length;i++)
if (!threadStates[i].isIdle)
@@ -600,126 +641,173 @@
}
synchronized boolean anyChanges() {
- return numDocsInRAM != 0 ||
- deletesInRAM.numTerms != 0 ||
- deletesInRAM.docIDs.size() != 0 ||
- deletesInRAM.queries.size() != 0;
+ return numDocsInRAM != 0 || pendingDeletes.any();
}
- synchronized private void initFlushState(boolean onlyDocStore) {
- initSegmentName(onlyDocStore);
- final SegmentCodecs info = SegmentCodecs.build(fieldInfos, writer.codecs);
- flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
- docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(), info);
+ // for testing
+ public SegmentDeletes getPendingDeletes() {
+ return pendingDeletes;
}
- /** Returns the SegmentCodecs used to flush the last segment */
- SegmentCodecs getSegmentCodecs() {
- return flushState.segmentCodecs;
+ private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
+ // Lock order: DW -> BD
+ if (pendingDeletes.any()) {
+ if (newSegment != null) {
+ if (infoStream != null) {
+ message("flush: push buffered deletes to newSegment");
+ }
+ bufferedDeletes.pushDeletes(pendingDeletes, newSegment);
+ } else if (segmentInfos.size() > 0) {
+ if (infoStream != null) {
+ message("flush: push buffered deletes to previously flushed segment " + segmentInfos.lastElement());
+ }
+ bufferedDeletes.pushDeletes(pendingDeletes, segmentInfos.lastElement(), true);
+ } else {
+ if (infoStream != null) {
+ message("flush: drop buffered deletes: no segments");
+ }
+ // We can safely discard these deletes: since
+ // there are no segments, the deletions cannot
+ // affect anything.
+ }
+ pendingDeletes = new SegmentDeletes();
+ }
}
-
+
+ public boolean anyDeletions() {
+ return pendingDeletes.any();
+ }
+
/** Flush all pending docs to a new segment */
- synchronized int flush(boolean closeDocStore) throws IOException {
+ // Lock order: IW -> DW
+ synchronized SegmentInfo flush(IndexWriter writer, boolean closeDocStore, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
- assert allThreadsIdle();
+ // We change writer's segmentInfos:
+ assert Thread.holdsLock(writer);
- assert numDocsInRAM > 0;
+ waitIdle();
- assert nextDocID == numDocsInRAM;
- assert waitQueue.numWaiting == 0;
- assert waitQueue.waitingBytes == 0;
+ if (numDocsInRAM == 0 && numDocsInStore == 0) {
+ // nothing to do!
+ if (infoStream != null) {
+ message("flush: no docs; skipping");
+ }
+ // Lock order: IW -> DW -> BD
+ pushDeletes(null, segmentInfos);
+ return null;
+ }
- initFlushState(false);
+ if (aborting) {
+ if (infoStream != null) {
+ message("flush: skip because aborting is set");
+ }
+ return null;
+ }
- docStoreOffset = numDocsInStore;
+ boolean success = false;
- if (infoStream != null)
- message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
-
- boolean success = false;
+ SegmentInfo newSegment;
try {
- if (closeDocStore) {
- assert flushState.docStoreSegmentName != null;
- assert flushState.docStoreSegmentName.equals(flushState.segmentName);
- closeDocStore();
- flushState.numDocsInStore = 0;
+ assert waitQueue.waitingBytes == 0;
+
+ assert docStoreSegment != null || numDocsInRAM == 0: "dss=" + docStoreSegment + " numDocsInRAM=" + numDocsInRAM;
+
+ assert numDocsInStore >= numDocsInRAM: "numDocsInStore=" + numDocsInStore + " numDocsInRAM=" + numDocsInRAM;
+
+ final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
+ docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
+ SegmentCodecs.build(fieldInfos, writer.codecs));
+
+ newSegment = new SegmentInfo(segment, numDocsInRAM, directory, false, -1, null, false, hasProx(), flushState.segmentCodecs);
+
+ if (!closeDocStore || docStoreOffset != 0) {
+ newSegment.setDocStoreSegment(docStoreSegment);
+ newSegment.setDocStoreOffset(docStoreOffset);
}
- Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
- for(int i=0;i<threadStates.length;i++)
- threads.add(threadStates[i].consumer);
-
- final long startNumBytesUsed = bytesUsed();
- consumer.flush(threads, flushState);
-
- if (infoStream != null) {
- SegmentInfo si = new SegmentInfo(flushState.segmentName,
- flushState.numDocs, directory, false, -1, flushState.segmentName,
- false, hasProx(), flushState.segmentCodecs);
- final long newSegmentSize = si.sizeInBytes();
- String message = " ramUsed=" + nf.format(startNumBytesUsed/1024./1024.) + " MB" +
- " newFlushedSize=" + newSegmentSize +
- " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
- " new/old=" + nf.format(100.0*newSegmentSize/startNumBytesUsed) + "%";
- message(message);
+ if (closeDocStore) {
+ closeDocStore(flushState, writer, deleter, newSegment, mergePolicy, segmentInfos);
}
- flushedDocCount += flushState.numDocs;
+ if (numDocsInRAM > 0) {
- doAfterFlush();
+ assert nextDocID == numDocsInRAM;
+ assert waitQueue.numWaiting == 0;
+ assert waitQueue.waitingBytes == 0;
+
+ if (infoStream != null) {
+ message("flush postings as segment " + segment + " numDocs=" + numDocsInRAM);
+ }
+
+ final Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
+ for(int i=0;i<threadStates.length;i++) {
+ threads.add(threadStates[i].consumer);
+ }
+
+ final long startNumBytesUsed = bytesUsed();
+ consumer.flush(threads, flushState);
+
+ if (infoStream != null) {
+ message("flushedFiles=" + flushState.flushedFiles);
+ message("flushed codecs=" + newSegment.getSegmentCodecs());
+ }
+
+ if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
+
+ final String cfsFileName = IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+
+ if (infoStream != null) {
+ message("flush: create compound file \"" + cfsFileName + "\"");
+ }
+
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
+ for(String fileName : flushState.flushedFiles) {
+ cfsWriter.addFile(fileName);
+ }
+ cfsWriter.close();
+ deleter.deleteNewFiles(flushState.flushedFiles);
+
+ newSegment.setUseCompoundFile(true);
+ }
+
+ if (infoStream != null) {
+ message("flush: segment=" + newSegment);
+ final long newSegmentSize = newSegment.sizeInBytes();
+ String message = " ramUsed=" + nf.format(startNumBytesUsed/1024./1024.) + " MB" +
+ " newFlushedSize=" + nf.format(newSegmentSize/1024/1024) + " MB" +
+ " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+ " new/old=" + nf.format(100.0*newSegmentSize/startNumBytesUsed) + "%";
+ message(message);
+ }
+
+ } else {
+ if (infoStream != null) {
+ message("skip flushing segment: no docs");
+ }
+ newSegment = null;
+ }
success = true;
-
} finally {
+ notifyAll();
if (!success) {
+ if (segment != null) {
+ deleter.refresh(segment);
+ }
abort();
}
}
- assert waitQueue.waitingBytes == 0;
+ doAfterFlush();
- return flushState.numDocs;
- }
+ // Lock order: IW -> DW -> BD
+ pushDeletes(newSegment, segmentInfos);
- Collection<String> getFlushedFiles() {
- return flushState.flushedFiles;
- }
+ docStoreOffset = numDocsInStore;
- /** Build compound file for the segment we just flushed */
- void createCompoundFile(String segment) throws IOException {
-
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory,
- IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION));
- for(String fileName : flushState.flushedFiles) {
- cfsWriter.addFile(fileName);
- }
-
- // Perform the merge
- cfsWriter.close();
- }
-
- /** Set flushPending if it is not already set and returns
- * whether it was set. This is used by IndexWriter to
- * trigger a single flush even when multiple threads are
- * trying to do so. */
- synchronized boolean setFlushPending() {
- if (flushPending)
- return false;
- else {
- flushPending = true;
- return true;
- }
- }
-
- synchronized void clearFlushPending() {
- bufferIsFull = false;
- flushPending = false;
- }
-
- synchronized void pushDeletes() {
- deletesFlushed.update(deletesInRAM);
+ return newSegment;
}
synchronized void close() {
@@ -746,6 +834,7 @@
synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
final Thread currentThread = Thread.currentThread();
+ assert !Thread.holdsLock(writer);
// First, find a thread state. If this thread already
// has affinity to a specific ThreadState, use that one
@@ -776,73 +865,35 @@
}
// Next, wait until my thread state is idle (in case
- // it's shared with other threads) and for threads to
- // not be paused nor a flush pending:
+ // it's shared with other threads), and no flush/abort
+ // pending
waitReady(state);
// Allocate segment name if this is the first doc since
// last flush:
initSegmentName(false);
- state.isIdle = false;
+ state.docState.docID = nextDocID++;
- boolean success = false;
- try {
- state.docState.docID = nextDocID;
-
- assert writer.testPoint("DocumentsWriter.ThreadState.init start");
-
- if (delTerm != null) {
- addDeleteTerm(delTerm, state.docState.docID);
- state.doFlushAfter = timeToFlushDeletes();
- }
-
- assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm");
-
- nextDocID++;
- numDocsInRAM++;
-
- // We must at this point commit to flushing to ensure we
- // always get N docs when we flush by doc count, even if
- // > 1 thread is adding documents:
- if (!flushPending &&
- maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH
- && numDocsInRAM >= maxBufferedDocs) {
- flushPending = true;
- state.doFlushAfter = true;
- }
-
- success = true;
- } finally {
- if (!success) {
- // Forcefully idle this ThreadState:
- state.isIdle = true;
- notifyAll();
- if (state.doFlushAfter) {
- state.doFlushAfter = false;
- flushPending = false;
- }
- }
+ if (delTerm != null) {
+ pendingDeletes.addTerm(delTerm, state.docState.docID);
}
+ numDocsInRAM++;
+ state.isIdle = false;
return state;
}
-
- /** Returns true if the caller (IndexWriter) should now
- * flush. */
- boolean addDocument(Document doc, Analyzer analyzer)
- throws CorruptIndexException, IOException {
+
+ boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, null);
}
-
- boolean updateDocument(Term t, Document doc, Analyzer analyzer)
- throws CorruptIndexException, IOException {
- return updateDocument(doc, analyzer, t);
- }
-
+
boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
throws CorruptIndexException, IOException {
-
+
+ // Possibly trigger a flush, or wait until any running flush completes:
+ boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
+
// This call is synchronized but fast
final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
@@ -867,11 +918,23 @@
success = true;
} finally {
if (!success) {
+
+ // If this thread state had decided to flush, we
+ // must clear it so another thread can flush
+ if (doFlush) {
+ flushControl.clearFlushPending();
+ }
+
+ if (infoStream != null) {
+ message("exception in updateDocument aborting=" + aborting);
+ }
+
synchronized(this) {
+ state.isIdle = true;
+ notifyAll();
+
if (aborting) {
- state.isIdle = true;
- notifyAll();
abort();
} else {
skipDocWriter.docID = docState.docID;
@@ -881,61 +944,38 @@
success2 = true;
} finally {
if (!success2) {
- state.isIdle = true;
- notifyAll();
abort();
return false;
}
}
- state.isIdle = true;
- notifyAll();
-
- // If this thread state had decided to flush, we
- // must clear it so another thread can flush
- if (state.doFlushAfter) {
- state.doFlushAfter = false;
- flushPending = false;
- notifyAll();
- }
-
// Immediately mark this document as deleted
// since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when
// adding a document:
- addDeleteDocID(state.docState.docID);
+ deleteDocID(state.docState.docID);
}
}
}
}
- return state.doFlushAfter || timeToFlushDeletes();
+ doFlush |= flushControl.flushByRAMUsage("new document");
+
+ return doFlush;
}
- // for testing
- synchronized int getNumBufferedDeleteTerms() {
- return deletesInRAM.numTerms;
+ public synchronized void waitIdle() {
+ while (!allThreadsIdle()) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
}
- // for testing
- synchronized Map<Term,BufferedDeletes.Num> getBufferedDeleteTerms() {
- return deletesInRAM.terms;
- }
-
- /** Called whenever a merge has completed and the merged segments had deletions */
- synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) {
- if (docMaps == null)
- // The merged segments had no deletes so docIDs did not change and we have nothing to do
- return;
- MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount);
- deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
- deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
- flushedDocCount -= mapper.docShift;
- }
-
- synchronized private void waitReady(DocumentsWriterThreadState state) {
-
- while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) {
+ synchronized void waitReady(DocumentsWriterThreadState state) {
+ while (!closed && (!state.isIdle || aborting)) {
try {
wait();
} catch (InterruptedException ie) {
@@ -943,261 +983,9 @@
}
}
- if (closed)
+ if (closed) {
throw new AlreadyClosedException("this IndexWriter is closed");
- }
-
- boolean bufferDeleteTerms(Term[] terms) throws IOException {
- synchronized(this) {
- waitReady(null);
- for (int i = 0; i < terms.length; i++)
- addDeleteTerm(terms[i], numDocsInRAM);
}
- return timeToFlushDeletes();
- }
-
- boolean bufferDeleteTerm(Term term) throws IOException {
- synchronized(this) {
- waitReady(null);
- addDeleteTerm(term, numDocsInRAM);
- }
- return timeToFlushDeletes();
- }
-
- boolean bufferDeleteQueries(Query[] queries) throws IOException {
- synchronized(this) {
- waitReady(null);
- for (int i = 0; i < queries.length; i++)
- addDeleteQuery(queries[i], numDocsInRAM);
- }
- return timeToFlushDeletes();
- }
-
- boolean bufferDeleteQuery(Query query) throws IOException {
- synchronized(this) {
- waitReady(null);
- addDeleteQuery(query, numDocsInRAM);
- }
- return timeToFlushDeletes();
- }
-
- synchronized boolean deletesFull() {
- return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + bytesUsed()) >= ramBufferSize) ||
- (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
- }
-
- synchronized boolean doApplyDeletes() {
- // Very similar to deletesFull(), except we don't count
- // numBytesUsed, because we are checking whether
- // deletes (alone) are consuming too many resources now
- // and thus should be applied. We apply deletes if RAM
- // usage is > 1/2 of our allowed RAM buffer, to prevent
- // too-frequent flushing of a long tail of tiny segments
- // when merges (which always apply deletes) are
- // infrequent.
- return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize/2) ||
- (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
- }
-
- private boolean timeToFlushDeletes() {
- balanceRAM();
- synchronized(this) {
- return (bufferIsFull || deletesFull()) && setFlushPending();
- }
- }
-
- void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
- this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
- }
-
- int getMaxBufferedDeleteTerms() {
- return maxBufferedDeleteTerms;
- }
-
- synchronized boolean hasDeletes() {
- return deletesFlushed.any();
- }
-
- synchronized boolean applyDeletes(SegmentInfos infos) throws IOException {
-
- if (!hasDeletes())
- return false;
-
- final long t0 = System.currentTimeMillis();
-
- if (infoStream != null)
- message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " +
- deletesFlushed.docIDs.size() + " deleted docIDs and " +
- deletesFlushed.queries.size() + " deleted queries on " +
- + infos.size() + " segments.");
-
- final int infosEnd = infos.size();
-
- int docStart = 0;
- boolean any = false;
- for (int i = 0; i < infosEnd; i++) {
-
- // Make sure we never attempt to apply deletes to
- // segment in external dir
- assert infos.info(i).dir == directory;
-
- SegmentReader reader = writer.readerPool.get(infos.info(i), false);
- try {
- any |= applyDeletes(reader, docStart);
- docStart += reader.maxDoc();
- } finally {
- writer.readerPool.release(reader);
- }
- }
-
- deletesFlushed.clear();
- if (infoStream != null) {
- message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec");
- }
-
- return any;
- }
-
- // used only by assert
- private Term lastDeleteTerm;
-
- // used only by assert
- private boolean checkDeleteTerm(Term term) {
- if (term != null) {
- assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
- }
- lastDeleteTerm = term;
- return true;
- }
-
- // Apply buffered delete terms, queries and docIDs to the
- // provided reader
- private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart)
- throws CorruptIndexException, IOException {
-
- final int docEnd = docIDStart + reader.maxDoc();
- boolean any = false;
-
- assert checkDeleteTerm(null);
-
- // Delete by term
- if (deletesFlushed.terms.size() > 0) {
- Fields fields = reader.fields();
- if (fields == null) {
- // This reader has no postings
- return false;
- }
-
- TermsEnum termsEnum = null;
-
- String currentField = null;
- DocsEnum docs = null;
-
- for (Entry<Term, BufferedDeletes.Num> entry: deletesFlushed.terms.entrySet()) {
- Term term = entry.getKey();
- // Since we visit terms sorted, we gain performance
- // by re-using the same TermsEnum and seeking only
- // forwards
- if (term.field() != currentField) {
- assert currentField == null || currentField.compareTo(term.field()) < 0;
- currentField = term.field();
- Terms terms = fields.terms(currentField);
- if (terms != null) {
- termsEnum = terms.iterator();
- } else {
- termsEnum = null;
- }
- }
-
- if (termsEnum == null) {
- continue;
- }
- assert checkDeleteTerm(term);
-
- if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
- DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
-
- if (docsEnum != null) {
- docs = docsEnum;
- int limit = entry.getValue().getNum();
- while (true) {
- final int docID = docs.nextDoc();
- if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) {
- break;
- }
- reader.deleteDocument(docID);
- any = true;
- }
- }
- }
- }
- }
-
- // Delete by docID
- for (Integer docIdInt : deletesFlushed.docIDs) {
- int docID = docIdInt.intValue();
- if (docID >= docIDStart && docID < docEnd) {
- reader.deleteDocument(docID-docIDStart);
- any = true;
- }
- }
-
- // Delete by query
- if (deletesFlushed.queries.size() > 0) {
- IndexSearcher searcher = new IndexSearcher(reader);
- try {
- for (Entry<Query, Integer> entry : deletesFlushed.queries.entrySet()) {
- Query query = entry.getKey();
- int limit = entry.getValue().intValue();
- Weight weight = query.weight(searcher);
- Scorer scorer = weight.scorer(reader, true, false);
- if (scorer != null) {
- while(true) {
- int doc = scorer.nextDoc();
- if (((long) docIDStart) + doc >= limit)
- break;
- reader.deleteDocument(doc);
- any = true;
- }
- }
- }
- } finally {
- searcher.close();
- }
- }
- return any;
- }
-
- // Buffer a term in bufferedDeleteTerms, which records the
- // current number of documents buffered in ram so that the
- // delete term will be applied to those documents as well
- // as the disk segments.
- synchronized private void addDeleteTerm(Term term, int docCount) {
- BufferedDeletes.Num num = deletesInRAM.terms.get(term);
- final int docIDUpto = flushedDocCount + docCount;
- if (num == null)
- deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto));
- else
- num.setNum(docIDUpto);
- deletesInRAM.numTerms++;
-
- deletesInRAM.addBytesUsed(BYTES_PER_DEL_TERM + term.bytes.length);
- }
-
- // Buffer a specific docID for deletion. Currently only
- // used when we hit a exception when adding a document
- synchronized private void addDeleteDocID(int docID) {
- deletesInRAM.docIDs.add(Integer.valueOf(flushedDocCount+docID));
- deletesInRAM.addBytesUsed(BYTES_PER_DEL_DOCID);
- }
-
- synchronized private void addDeleteQuery(Query query, int docID) {
- deletesInRAM.queries.put(query, Integer.valueOf(flushedDocCount + docID));
- deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY);
}
/** Does the synchronized work to finish/flush the
@@ -1218,14 +1006,18 @@
// waiting for me to become idle. We just forcefully
// idle this threadState; it will be fully reset by
// abort()
- if (docWriter != null)
+ if (docWriter != null) {
try {
docWriter.abort();
} catch (Throwable t) {
}
+ }
perThread.isIdle = true;
+
+ // wakes up any threads waiting on the wait queue
notifyAll();
+
return;
}
@@ -1241,12 +1033,9 @@
if (doPause)
waitForWaitQueue();
- if (bufferIsFull && !flushPending) {
- flushPending = true;
- perThread.doFlushAfter = true;
- }
+ perThread.isIdle = true;
- perThread.isIdle = true;
+ // wakes up any threads waiting on the wait queue
notifyAll();
}
}
@@ -1275,42 +1064,8 @@
}
final SkipDocWriter skipDocWriter = new SkipDocWriter();
- long getRAMUsed() {
- return bytesUsed() + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
- }
-
NumberFormat nf = NumberFormat.getInstance();
- // Coarse estimates used to measure RAM usage of buffered deletes
- final static int OBJECT_HEADER_BYTES = 8;
- final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
- final static int INT_NUM_BYTE = 4;
- final static int CHAR_NUM_BYTE = 2;
-
- /* Rough logic: HashMap has an array[Entry] w/ varying
- load factor (say 2 * POINTER). Entry is object w/ Term
- key, BufferedDeletes.Num val, int hash, Entry next
- (OBJ_HEADER + 3*POINTER + INT). Term is object w/
- String field and String text (OBJ_HEADER + 2*POINTER).
- We don't count Term's field since it's interned.
- Term's text is String (OBJ_HEADER + 4*INT + POINTER +
- OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is
- OBJ_HEADER + INT. */
-
- final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
-
- /* Rough logic: del docIDs are List<Integer>. Say list
- allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
- + int */
- final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
-
- /* Rough logic: HashMap has an array[Entry] w/ varying
- load factor (say 2 * POINTER). Entry is object w/
- Query key, Integer val, int hash, Entry next
- (OBJ_HEADER + 3*POINTER + INT). Query we often
- undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
- final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
-
/* Initial chunks size of the shared byte[] blocks used to
store postings data */
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
@@ -1333,14 +1088,14 @@
final int[] b;
if (0 == size) {
b = new int[INT_BLOCK_SIZE];
- bytesUsed.addAndGet(INT_BLOCK_SIZE*INT_NUM_BYTE);
+ bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
} else
b = freeIntBlocks.remove(size-1);
return b;
}
- private long bytesUsed() {
- return bytesUsed.get();
+ long bytesUsed() {
+ return bytesUsed.get() + pendingDeletes.bytesUsed.get();
}
/* Return int[]s to the pool */
@@ -1376,19 +1131,20 @@
final boolean doBalance;
final long deletesRAMUsed;
+ deletesRAMUsed = bufferedDeletes.bytesUsed();
+
synchronized(this) {
if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
return;
}
- deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed;
- doBalance = bytesUsed() +deletesRAMUsed >= ramBufferSize;
+ doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
}
if (doBalance) {
if (infoStream != null)
- message(" RAM: now balance allocations: usedMB=" + toMB(bytesUsed()) +
+ message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
" vs trigger=" + toMB(ramBufferSize) +
" deletesMB=" + toMB(deletesRAMUsed) +
" byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
@@ -1414,7 +1170,7 @@
bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
if (infoStream != null) {
if (bytesUsed()+deletesRAMUsed > ramBufferSize)
- message(" nothing to free; now set bufferIsFull");
+ message(" nothing to free; set bufferIsFull");
else
message(" nothing to free");
}
@@ -1426,7 +1182,7 @@
}
if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
freeIntBlocks.remove(freeIntBlocks.size()-1);
- bytesUsed.addAndGet(-INT_BLOCK_SIZE * INT_NUM_BYTE);
+ bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
}
if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
@@ -1501,8 +1257,9 @@
nextWriteLoc = 0;
success = true;
} finally {
- if (!success)
+ if (!success) {
setAborting();
+ }
}
}
@@ -1519,8 +1276,9 @@
waiting[nextWriteLoc] = null;
waitingBytes -= doc.sizeInBytes();
writeDocument(doc);
- } else
+ } else {
break;
+ }
}
} else {
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java Thu Dec 09 10:10:12 2010 -0500
@@ -27,7 +27,6 @@
boolean isIdle = true; // false if this is currently in use by a thread
int numThreads = 1; // Number of threads that share this instance
- boolean doFlushAfter; // true if we should flush after processing current doc
final DocConsumerPerThread consumer;
final DocumentsWriter.DocState docState;
@@ -45,6 +44,5 @@
void doAfterFlush() {
numThreads = 0;
- doFlushAfter = false;
}
}
Index: lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
--- lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Thu Dec 09 10:10:12 2010 -0500
@@ -21,6 +21,7 @@
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.RamUsageEstimator;
// TODO: break into separate freq and prox writers as
// codecs; make separate container (tii/tis/skip/*) that can
@@ -88,7 +89,7 @@
}
}
- final void writeProx(final int termID, int proxCode) {
+ void writeProx(final int termID, int proxCode) {
final Payload payload;
if (payloadAttribute == null) {
payload = null;
@@ -110,7 +111,7 @@
}
@Override
- final void newTerm(final int termID) {
+ void newTerm(final int termID) {
// First time we're seeing this term since the last
// flush
assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
@@ -127,7 +128,7 @@
}
@Override
- final void addTerm(final int termID) {
+ void addTerm(final int termID) {
assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
@@ -205,7 +206,7 @@
@Override
int bytesPerPosting() {
- return ParallelPostingsArray.BYTES_PER_POSTING + 4 * DocumentsWriter.INT_NUM_BYTE;
+ return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
}
}
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Dec 09 10:10:12 2010 -0500
@@ -35,6 +35,7 @@
import java.io.IOException;
import java.io.Closeable;
import java.io.PrintStream;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Collection;
import java.util.ArrayList;
@@ -201,9 +202,8 @@
private final static int MERGE_READ_BUFFER_SIZE = 4096;
// Used for printing messages
- private static Object MESSAGE_ID_LOCK = new Object();
- private static int MESSAGE_ID = 0;
- private int messageID = -1;
+ private static final AtomicInteger MESSAGE_ID = new AtomicInteger();
+ private int messageID = MESSAGE_ID.getAndIncrement();
volatile private boolean hitOOM;
private final Directory directory; // where this index resides
@@ -218,7 +218,7 @@
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
volatile long pendingCommitChangeCount;
- private final SegmentInfos segmentInfos; // the segments
+ final SegmentInfos segmentInfos; // the segments
private DocumentsWriter docWriter;
private IndexFileDeleter deleter;
@@ -245,10 +245,11 @@
private long mergeGen;
private boolean stopMerges;
- private int flushCount;
- private int flushDeletesCount;
+ private final AtomicInteger flushCount = new AtomicInteger();
+ private final AtomicInteger flushDeletesCount = new AtomicInteger();
final ReaderPool readerPool = new ReaderPool();
+ final BufferedDeletes bufferedDeletes;
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
@@ -402,20 +403,26 @@
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
+ * @return true if this release altered the index (eg
+ * the SegmentReader had pending changes to del docs and
+ * was closed). Caller must call checkpoint() if so.
* @param sr
* @throws IOException
*/
- public synchronized void release(SegmentReader sr) throws IOException {
- release(sr, false);
+ public synchronized boolean release(SegmentReader sr) throws IOException {
+ return release(sr, false);
}
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
+ * @return true if this release altered the index (eg
+ * the SegmentReader had pending changes to del docs and
+ * was closed). Caller must call checkpoint() if so.
* @param sr
* @throws IOException
*/
- public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
+ public synchronized boolean release(SegmentReader sr, boolean drop) throws IOException {
final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
@@ -446,13 +453,10 @@
// not pooling readers, we release it:
readerMap.remove(sr.getSegmentInfo());
- if (hasChanges) {
- // Must checkpoint w/ deleter, because this
- // segment reader will have created new _X_N.del
- // file.
- deleter.checkpoint(segmentInfos, false);
- }
+ return hasChanges;
}
+
+ return false;
}
/** Remove all our references to readers, and commits
@@ -600,6 +604,8 @@
}
}
+
+
/**
* Obtain the number of deleted docs for a pooled reader.
* If the reader isn't being pooled, the segmentInfo's
@@ -646,15 +652,6 @@
infoStream.println("IW " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
}
- private synchronized void setMessageID(PrintStream infoStream) {
- if (infoStream != null && messageID == -1) {
- synchronized(MESSAGE_ID_LOCK) {
- messageID = MESSAGE_ID++;
- }
- }
- this.infoStream = infoStream;
- }
-
CodecProvider codecs;
/**
@@ -690,7 +687,7 @@
config = (IndexWriterConfig) conf.clone();
directory = d;
analyzer = conf.getAnalyzer();
- setMessageID(defaultInfoStream);
+ infoStream = defaultInfoStream;
maxFieldLength = conf.getMaxFieldLength();
termIndexInterval = conf.getTermIndexInterval();
mergePolicy = conf.getMergePolicy();
@@ -699,6 +696,8 @@
mergedSegmentWarmer = conf.getMergedSegmentWarmer();
codecs = conf.getCodecProvider();
+ bufferedDeletes = new BufferedDeletes(messageID);
+ bufferedDeletes.setInfoStream(infoStream);
poolReaders = conf.getReaderPooling();
OpenMode mode = conf.getOpenMode();
@@ -766,7 +765,7 @@
setRollbackSegmentInfos(segmentInfos);
- docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates(), getCurrentFieldInfos());
+ docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates(), getCurrentFieldInfos(), bufferedDeletes);
docWriter.setInfoStream(infoStream);
docWriter.setMaxFieldLength(maxFieldLength);
@@ -785,7 +784,6 @@
segmentInfos.changed();
}
- docWriter.setMaxBufferedDeleteTerms(conf.getMaxBufferedDeleteTerms());
docWriter.setRAMBufferSizeMB(conf.getRAMBufferSizeMB());
docWriter.setMaxBufferedDocs(conf.getMaxBufferedDocs());
pushMaxBufferedDocs();
@@ -896,9 +894,10 @@
*/
public void setInfoStream(PrintStream infoStream) {
ensureOpen();
- setMessageID(infoStream);
+ this.infoStream = infoStream;
docWriter.setInfoStream(infoStream);
deleter.setInfoStream(infoStream);
+ bufferedDeletes.setInfoStream(infoStream);
if (infoStream != null)
messageState();
}
@@ -1029,8 +1028,6 @@
private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
- docWriter.pauseAllThreads();
-
try {
if (infoStream != null)
message("now flush at close");
@@ -1085,8 +1082,6 @@
closing = false;
notifyAll();
if (!closed) {
- if (docWriter != null)
- docWriter.resumeAllThreads();
if (infoStream != null)
message("hit exception while closing");
}
@@ -1094,87 +1089,6 @@
}
}
- /** Tells the docWriter to close its currently open shared
- * doc stores (stored fields & vectors files).
- * Return value specifices whether new doc store files are compound or not.
- */
- private synchronized boolean flushDocStores() throws IOException {
-
- if (infoStream != null) {
- message("flushDocStores segment=" + docWriter.getDocStoreSegment());
- }
-
- boolean useCompoundDocStore = false;
- if (infoStream != null) {
- message("closeDocStores segment=" + docWriter.getDocStoreSegment());
- }
-
- String docStoreSegment;
-
- boolean success = false;
- try {
- docStoreSegment = docWriter.closeDocStore();
- success = true;
- } finally {
- if (!success && infoStream != null) {
- message("hit exception closing doc store segment");
- }
- }
-
- if (infoStream != null) {
- message("flushDocStores files=" + docWriter.closedFiles());
- }
-
- useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos);
-
- if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) {
- // Now build compound doc store file
-
- if (infoStream != null) {
- message("create compound file " + IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION));
- }
-
- success = false;
-
- final int numSegments = segmentInfos.size();
- final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
-
- try {
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
- for (final String file : docWriter.closedFiles() ) {
- cfsWriter.addFile(file);
- }
-
- // Perform the merge
- cfsWriter.close();
- success = true;
-
- } finally {
- if (!success) {
- if (infoStream != null)
- message("hit exception building compound file doc store for segment " + docStoreSegment);
- deleter.deleteFile(compoundFileName);
- docWriter.abort();
- }
- }
-
- for(int i=0;i<numSegments;i++) {
- SegmentInfo si = segmentInfos.info(i);
- if (si.getDocStoreOffset() != -1 &&
- si.getDocStoreSegment().equals(docStoreSegment))
- si.setDocStoreIsCompoundFile(true);
- }
-
- checkpoint();
-
- // In case the files we just merged into a CFS were
- // not previously checkpointed:
- deleter.deleteNewFiles(docWriter.closedFiles());
- }
-
- return useCompoundDocStore;
- }
-
/** Returns the Directory used by this index. */
public Directory getDirectory() {
// Pass false because the flush during closing calls getDirectory
@@ -1226,8 +1140,12 @@
public synchronized boolean hasDeletions() throws IOException {
ensureOpen();
- if (docWriter.hasDeletes())
+ if (bufferedDeletes.any()) {
return true;
+ }
+ if (docWriter.anyDeletions()) {
+ return true;
+ }
for (int i = 0; i < segmentInfos.size(); i++)
if (segmentInfos.info(i).hasDeletions())
return true;
@@ -1321,13 +1239,14 @@
boolean success = false;
try {
try {
- doFlush = docWriter.addDocument(doc, analyzer);
+ doFlush = docWriter.updateDocument(doc, analyzer, null);
success = true;
} finally {
if (!success) {
- if (infoStream != null)
+ if (infoStream != null) {
message("hit exception adding document");
+ }
synchronized (this) {
// If docWriter has some aborted files that were
@@ -1361,9 +1280,9 @@
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
try {
- boolean doFlush = docWriter.bufferDeleteTerm(term);
- if (doFlush)
+ if (docWriter.deleteTerm(term, false)) {
flush(true, false, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@@ -1385,9 +1304,9 @@
public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
ensureOpen();
try {
- boolean doFlush = docWriter.bufferDeleteTerms(terms);
- if (doFlush)
+ if (docWriter.deleteTerms(terms)) {
flush(true, false, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@@ -1406,9 +1325,13 @@
*/
public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
ensureOpen();
- boolean doFlush = docWriter.bufferDeleteQuery(query);
- if (doFlush)
- flush(true, false, false);
+ try {
+ if (docWriter.deleteQuery(query)) {
+ flush(true, false, false);
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "deleteDocuments(Query)");
+ }
}
/**
@@ -1426,9 +1349,13 @@
*/
public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
ensureOpen();
- boolean doFlush = docWriter.bufferDeleteQueries(queries);
- if (doFlush)
- flush(true, false, false);
+ try {
+ if (docWriter.deleteQueries(queries)) {
+ flush(true, false, false);
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "deleteDocuments(Query..)");
+ }
}
/**
@@ -1478,25 +1405,30 @@
boolean doFlush = false;
boolean success = false;
try {
- doFlush = docWriter.updateDocument(term, doc, analyzer);
+ doFlush = docWriter.updateDocument(doc, analyzer, term);
success = true;
} finally {
if (!success) {
- if (infoStream != null)
+ if (infoStream != null) {
message("hit exception updating document");
+ }
synchronized (this) {
// If docWriter has some aborted files that were
// never incref'd, then we clean them up here
- final Collection<String> files = docWriter.abortedFiles();
- if (files != null)
- deleter.deleteNewFiles(files);
+ if (docWriter != null) {
+ final Collection<String> files = docWriter.abortedFiles();
+ if (files != null) {
+ deleter.deleteNewFiles(files);
+ }
+ }
}
}
}
- if (doFlush)
+ if (doFlush) {
flush(true, false, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
}
@@ -1522,13 +1454,13 @@
}
// for test purpose
- final synchronized int getFlushCount() {
- return flushCount;
+ final int getFlushCount() {
+ return flushCount.get();
}
// for test purpose
- final synchronized int getFlushDeletesCount() {
- return flushDeletesCount;
+ final int getFlushDeletesCount() {
+ return flushDeletesCount.get();
}
final String newSegmentName() {
@@ -1660,8 +1592,10 @@
if (maxNumSegments < 1)
throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments);
- if (infoStream != null)
+ if (infoStream != null) {
message("optimize: index now " + segString());
+ message("now flush at optimize");
+ }
flush(true, false, true);
@@ -1944,8 +1878,6 @@
message("rollback");
}
- docWriter.pauseAllThreads();
-
try {
finishMerges(false);
@@ -1955,6 +1887,8 @@
mergePolicy.close();
mergeScheduler.close();
+ bufferedDeletes.clear();
+
synchronized(this) {
if (pendingCommit != null) {
@@ -1993,7 +1927,6 @@
} finally {
synchronized(this) {
if (!success) {
- docWriter.resumeAllThreads();
closing = false;
notifyAll();
if (infoStream != null)
@@ -2021,7 +1954,6 @@
* will receive {@link MergePolicy.MergeAbortedException}s.
*/
public synchronized void deleteAll() throws IOException {
- docWriter.pauseAllThreads();
try {
// Abort any running merges
@@ -2029,7 +1961,6 @@
// Remove any buffered docs
docWriter.abort();
- docWriter.setFlushedDocCount(0);
// Remove all segments
segmentInfos.clear();
@@ -2047,7 +1978,6 @@
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteAll");
} finally {
- docWriter.resumeAllThreads();
if (infoStream != null) {
message("hit exception during deleteAll");
}
@@ -2123,7 +2053,7 @@
* the index files referenced exist (correctly) in the
* index directory.
*/
- private synchronized void checkpoint() throws IOException {
+ synchronized void checkpoint() throws IOException {
changeCount++;
segmentInfos.changed();
deleter.checkpoint(segmentInfos, false);
@@ -2259,9 +2189,6 @@
synchronized (this) {
ensureOpen();
segmentInfos.addAll(infos);
- // Notify DocumentsWriter that the flushed count just increased
- docWriter.updateFlushedDocCount(docCount);
-
checkpoint();
}
@@ -2324,10 +2251,6 @@
// Register the new segment
synchronized(this) {
segmentInfos.add(info);
-
- // Notify DocumentsWriter that the flushed count just increased
- docWriter.updateFlushedDocCount(docCount);
-
checkpoint();
}
} catch (OutOfMemoryError oom) {
@@ -2535,196 +2458,92 @@
// We can be called during close, when closing==true, so we must pass false to ensureOpen:
ensureOpen(false);
- if (doFlush(flushDocStores, flushDeletes) && triggerMerge)
+ if (doFlush(flushDocStores, flushDeletes) && triggerMerge) {
maybeMerge();
+ }
}
// TODO: this method should not have to be entirely
// synchronized, ie, merges should be allowed to commit
// even while a flush is happening
- private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
- try {
- try {
- return doFlushInternal(flushDocStores, flushDeletes);
- } finally {
- docWriter.balanceRAM();
- }
- } finally {
- docWriter.clearFlushPending();
- }
- }
-
- // TODO: this method should not have to be entirely
- // synchronized, ie, merges should be allowed to commit
- // even while a flush is happening
- private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
+ private synchronized final boolean doFlush(boolean closeDocStores, boolean applyAllDeletes) throws CorruptIndexException, IOException {
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
}
- ensureOpen(false);
+ doBeforeFlush();
assert testPoint("startDoFlush");
- doBeforeFlush();
-
- flushCount++;
-
- // If we are flushing because too many deletes
- // accumulated, then we should apply the deletes to free
- // RAM:
- flushDeletes |= docWriter.doApplyDeletes();
-
- // Make sure no threads are actively adding a document.
- // Returns true if docWriter is currently aborting, in
- // which case we skip flushing this segment
- if (infoStream != null) {
- message("flush: now pause all indexing threads");
- }
- if (docWriter.pauseAllThreads()) {
- docWriter.resumeAllThreads();
- return false;
- }
+ // We may be flushing because it was triggered by doc
+ // count, del count, ram usage (in which case flush
+ // pending is already set), or we may be flushing
+ // due to external event eg getReader or commit is
+ // called (in which case we now set it, and this will
+ // pause all threads):
+ flushControl.setFlushPendingNoWait("explicit flush");
+
+ boolean success = false;
try {
- SegmentInfo newSegment = null;
-
- final int numDocs = docWriter.getNumDocsInRAM();
-
- // Always flush docs if there are any
- boolean flushDocs = numDocs > 0;
-
- String docStoreSegment = docWriter.getDocStoreSegment();
-
- assert docStoreSegment != null || numDocs == 0: "dss=" + docStoreSegment + " numDocs=" + numDocs;
-
- if (docStoreSegment == null)
- flushDocStores = false;
-
- int docStoreOffset = docWriter.getDocStoreOffset();
-
- boolean docStoreIsCompoundFile = false;
-
if (infoStream != null) {
- message(" flush: segment=" + docWriter.getSegment() +
- " docStoreSegment=" + docWriter.getDocStoreSegment() +
- " docStoreOffset=" + docStoreOffset +
- " flushDocs=" + flushDocs +
- " flushDeletes=" + flushDeletes +
- " flushDocStores=" + flushDocStores +
- " numDocs=" + numDocs +
- " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms());
+ message(" start flush: applyAllDeletes=" + applyAllDeletes + " closeDocStores=" + closeDocStores);
message(" index before flush " + segString());
}
-
- // Check if the doc stores must be separately flushed
- // because other segments, besides the one we are about
- // to flush, reference it
- if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) {
- // We must separately flush the doc store
- if (infoStream != null)
- message(" flush shared docStore segment " + docStoreSegment);
-
- docStoreIsCompoundFile = flushDocStores();
- flushDocStores = false;
- }
-
- String segment = docWriter.getSegment();
-
- // If we are flushing docs, segment must not be null:
- assert segment != null || !flushDocs;
-
- if (flushDocs) {
-
- boolean success = false;
- final int flushedDocCount;
-
- try {
- flushedDocCount = docWriter.flush(flushDocStores);
- if (infoStream != null) {
- message("flushedFiles=" + docWriter.getFlushedFiles());
- }
- success = true;
- } finally {
- if (!success) {
- if (infoStream != null)
- message("hit exception flushing segment " + segment);
- deleter.refresh(segment);
- }
- }
-
- if (0 == docStoreOffset && flushDocStores) {
- // This means we are flushing private doc stores
- // with this segment, so it will not be shared
- // with other segments
- assert docStoreSegment != null;
- assert docStoreSegment.equals(segment);
- docStoreOffset = -1;
- docStoreIsCompoundFile = false;
- docStoreSegment = null;
- }
-
- // Create new SegmentInfo, but do not add to our
- // segmentInfos until deletes are flushed
- // successfully.
- newSegment = new SegmentInfo(segment,
- flushedDocCount,
- directory, false, docStoreOffset,
- docStoreSegment, docStoreIsCompoundFile,
- docWriter.hasProx(),
- docWriter.getSegmentCodecs());
-
- if (infoStream != null) {
- message("flush codec=" + docWriter.getSegmentCodecs());
- }
+
+ final SegmentInfo newSegment = docWriter.flush(this, closeDocStores, deleter, mergePolicy, segmentInfos);
+ if (newSegment != null) {
setDiagnostics(newSegment, "flush");
- }
-
- docWriter.pushDeletes();
-
- if (flushDocs) {
segmentInfos.add(newSegment);
checkpoint();
}
- if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
- // Now build compound file
- boolean success = false;
- try {
- docWriter.createCompoundFile(segment);
- success = true;
- } finally {
- if (!success) {
- if (infoStream != null)
- message("hit exception creating compound file for newly flushed segment " + segment);
- deleter.deleteFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION));
+ if (!applyAllDeletes) {
+ // If deletes alone are consuming > 1/2 our RAM
+ // buffer, force them all to apply now. This is to
+ // prevent too-frequent flushing of a long tail of
+ // tiny segments:
+ if (flushControl.getFlushDeletes() ||
+ (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+ bufferedDeletes.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
+ applyAllDeletes = true;
+ if (infoStream != null) {
+ message("force apply deletes bytesUsed=" + bufferedDeletes.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
}
}
-
- newSegment.setUseCompoundFile(true);
- checkpoint();
}
- if (flushDeletes) {
- applyDeletes();
+ if (applyAllDeletes) {
+ if (infoStream != null) {
+ message("apply all deletes during flush");
+ }
+ flushDeletesCount.incrementAndGet();
+ if (bufferedDeletes.applyDeletes(readerPool, segmentInfos, segmentInfos)) {
+ checkpoint();
+ }
+ flushControl.clearDeletes();
+ } else if (infoStream != null) {
+ message("don't apply deletes now delTermCount=" + bufferedDeletes.numTerms() + " bytesUsed=" + bufferedDeletes.bytesUsed());
}
-
- if (flushDocs)
- checkpoint();
doAfterFlush();
-
- return flushDocs;
+ flushCount.incrementAndGet();
+
+ success = true;
+
+ return newSegment != null;
} catch (OutOfMemoryError oom) {
handleOOM(oom, "doFlush");
// never hit
return false;
} finally {
- docWriter.clearFlushPending();
- docWriter.resumeAllThreads();
+ flushControl.clearFlushPending();
+ if (!success && infoStream != null) {
+ message("hit exception during flush");
+ }
}
}
@@ -2733,7 +2552,7 @@
*/
public final long ramSizeInBytes() {
ensureOpen();
- return docWriter.getRAMUsed();
+ return docWriter.bytesUsed() + bufferedDeletes.bytesUsed();
}
/** Expert: Return the number of documents currently
@@ -2776,7 +2595,7 @@
* saves the resulting deletes file (incrementing the
* delete generation for merge.info). If no deletes were
* flushed, no new deletes file is saved. */
- synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergeReader) throws IOException {
+ synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergedReader) throws IOException {
assert testPoint("startCommitMergeDeletes");
@@ -2815,7 +2634,7 @@
assert currentDelDocs.get(j);
else {
if (currentDelDocs.get(j)) {
- mergeReader.doDelete(docUpto);
+ mergedReader.doDelete(docUpto);
delCount++;
}
docUpto++;
@@ -2829,7 +2648,7 @@
// does:
for(int j=0; j<docCount; j++) {
if (currentDelDocs.get(j)) {
- mergeReader.doDelete(docUpto);
+ mergedReader.doDelete(docUpto);
delCount++;
}
docUpto++;
@@ -2839,13 +2658,13 @@
docUpto += info.docCount;
}
- assert mergeReader.numDeletedDocs() == delCount;
-
- mergeReader.hasChanges = delCount > 0;
+ assert mergedReader.numDeletedDocs() == delCount;
+
+ mergedReader.hasChanges = delCount > 0;
}
/* FIXME if we want to support non-contiguous segment merges */
- synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount, SegmentReader mergedReader) throws IOException {
+ synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, SegmentReader mergedReader) throws IOException {
assert testPoint("startCommitMerge");
@@ -2873,7 +2692,6 @@
final int start = ensureContiguousMerge(merge);
commitMergedDeletes(merge, mergedReader);
- docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount);
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
@@ -2886,7 +2704,7 @@
segmentInfos.subList(start, start + merge.segments.size()).clear();
assert !segmentInfos.contains(merge.info);
segmentInfos.add(start, merge.info);
-
+
closeMergeReaders(merge, false);
// Must note the change to segmentInfos so any commits
@@ -2897,6 +2715,12 @@
// them so that they don't bother writing them to
// disk, updating SegmentInfo, etc.:
readerPool.clear(merge.segments);
+
+ // remove pending deletes of the segments
+ // that were merged, moving them onto the segment just
+ // before the merged segment
+ // Lock order: IW -> BD
+ bufferedDeletes.commitMerge(merge);
if (merge.optimize) {
// cascade the optimize:
@@ -3056,10 +2880,17 @@
final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
boolean success = false;
try {
+ // Lock order: IW -> BD
+ if (bufferedDeletes.applyDeletes(readerPool, segmentInfos, merge.segments)) {
+ checkpoint();
+ }
_mergeInit(merge);
success = true;
} finally {
if (!success) {
+ if (infoStream != null) {
+ message("hit exception in mergeInit");
+ }
mergeFinish(merge);
}
}
@@ -3082,9 +2913,7 @@
if (merge.isAborted())
return;
-
- applyDeletes();
-
+
final SegmentInfos sourceSegments = merge.segments;
final int end = sourceSegments.size();
@@ -3274,10 +3103,11 @@
if (suppressExceptions) {
// Suppress any new exceptions so we throw the
// original cause
+ boolean anyChanges = false;
for (int i=0;i<numSegments;i++) {
if (merge.readers[i] != null) {
try {
- readerPool.release(merge.readers[i], false);
+ anyChanges |= readerPool.release(merge.readers[i], false);
} catch (Throwable t) {
}
merge.readers[i] = null;
@@ -3294,6 +3124,9 @@
merge.readersClone[i] = null;
}
}
+ if (anyChanges) {
+ checkpoint();
+ }
} else {
for (int i=0;i<numSegments;i++) {
if (merge.readers[i] != null) {
@@ -3521,15 +3354,21 @@
mergedSegmentWarmer.warm(mergedReader);
}
- if (!commitMerge(merge, merger, mergedDocCount, mergedReader)) {
+ if (!commitMerge(merge, merger, mergedReader)) {
// commitMerge will return false if this merge was aborted
return 0;
}
} finally {
synchronized(this) {
- readerPool.release(mergedReader);
+ if (readerPool.release(mergedReader)) {
+ // Must checkpoint after releasing the
+ // mergedReader since it may have written a new
+ // deletes file:
+ checkpoint();
+ }
}
}
+
success = true;
} finally {
@@ -3549,37 +3388,14 @@
mergeExceptions.add(merge);
}
- // Apply buffered deletes to all segments.
- private final synchronized boolean applyDeletes() throws CorruptIndexException, IOException {
- assert testPoint("startApplyDeletes");
- if (infoStream != null) {
- message("applyDeletes");
- }
- flushDeletesCount++;
- boolean success = false;
- boolean changed;
- try {
- changed = docWriter.applyDeletes(segmentInfos);
- success = true;
- } finally {
- if (!success && infoStream != null) {
- message("hit exception flushing deletes");
- }
- }
-
- if (changed)
- checkpoint();
- return changed;
+ // For test purposes.
+ final int getBufferedDeleteTermsSize() {
+ return docWriter.getPendingDeletes().terms.size();
}
// For test purposes.
- final synchronized int getBufferedDeleteTermsSize() {
- return docWriter.getBufferedDeleteTerms().size();
- }
-
- // For test purposes.
- final synchronized int getNumBufferedDeleteTerms() {
- return docWriter.getNumBufferedDeleteTerms();
+ final int getNumBufferedDeleteTerms() {
+ return docWriter.getPendingDeletes().numTermDeletes.get();
}
// utility routines for tests
@@ -3819,14 +3635,13 @@
// finishStartCommit
// startCommitMergeDeletes
// startMergeInit
- // startApplyDeletes
// DocumentsWriter.ThreadState.init start
boolean testPoint(String name) {
return true;
}
synchronized boolean nrtIsCurrent(SegmentInfos infos) {
- return infos.version == segmentInfos.version && !docWriter.anyChanges();
+ return infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletes.any();
}
synchronized boolean isClosed() {
@@ -3863,7 +3678,6 @@
deleter.revisitPolicy();
}
-
/**
* Sets the {@link PayloadProcessorProvider} to use when merging payloads.
* Note that the given <code>pcp</code> will be invoked for every segment that
@@ -3894,4 +3708,123 @@
return payloadProcessorProvider;
}
+ // decides when flushes happen
+ final class FlushControl {
+
+ private boolean flushPending;
+ private boolean flushDeletes;
+ private int delCount;
+ private int docCount;
+ private boolean flushing;
+
+ private synchronized boolean setFlushPending(String reason, boolean doWait) {
+ if (flushPending || flushing) {
+ if (doWait) {
+ while(flushPending || flushing) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+ }
+ return false;
+ } else {
+ if (infoStream != null) {
+ message("now trigger flush reason=" + reason);
+ }
+ flushPending = true;
+ return flushPending;
+ }
+ }
+
+ public synchronized void setFlushPendingNoWait(String reason) {
+ setFlushPending(reason, false);
+ }
+
+ public synchronized boolean getFlushPending() {
+ return flushPending;
+ }
+
+ public synchronized boolean getFlushDeletes() {
+ return flushDeletes;
+ }
+
+ public synchronized void clearFlushPending() {
+ if (infoStream != null) {
+ message("clearFlushPending");
+ }
+ flushPending = false;
+ flushDeletes = false;
+ docCount = 0;
+ notifyAll();
+ }
+
+ public synchronized void clearDeletes() {
+ delCount = 0;
+ }
+
+ public synchronized boolean waitUpdate(int docInc, int delInc) {
+ return waitUpdate(docInc, delInc, false);
+ }
+
+ public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
+ while(flushPending) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+
+ // skipWait is only used when a thread is BOTH adding
+ // a doc and buffering a del term, and, the adding of
+ // the doc already triggered a flush
+ if (skipWait) {
+ docCount += docInc;
+ delCount += delInc;
+ return false;
+ }
+
+ final int maxBufferedDocs = config.getMaxBufferedDocs();
+ if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+ (docCount+docInc) >= maxBufferedDocs) {
+ return setFlushPending("maxBufferedDocs", true);
+ }
+ docCount += docInc;
+
+ final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
+ if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+ (delCount+delInc) >= maxBufferedDeleteTerms) {
+ flushDeletes = true;
+ return setFlushPending("maxBufferedDeleteTerms", true);
+ }
+ delCount += delInc;
+
+ return flushByRAMUsage("add delete/doc");
+ }
+
+ public synchronized boolean flushByRAMUsage(String reason) {
+ final double ramBufferSizeMB = config.getRAMBufferSizeMB();
+ if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+ final long limit = (long) (ramBufferSizeMB*1024*1024);
+ long used = bufferedDeletes.bytesUsed() + docWriter.bytesUsed();
+ if (used >= limit) {
+
+ // DocumentsWriter may be able to free up some
+ // RAM:
+ // Lock order: FC -> DW
+ docWriter.balanceRAM();
+
+ used = bufferedDeletes.bytesUsed() + docWriter.bytesUsed();
+ if (used >= limit) {
+ return setFlushPending("ram full: " + reason, false);
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+ final FlushControl flushControl = new FlushControl();
}
Index: lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
--- lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java Thu Dec 09 10:10:12 2010 -0500
@@ -1,7 +1,5 @@
package org.apache.lucene.index;
-import org.apache.lucene.util.ArrayUtil;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -19,9 +17,11 @@
* limitations under the License.
*/
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
class ParallelPostingsArray {
- final static int BYTES_PER_POSTING = 3 * DocumentsWriter.INT_NUM_BYTE;
+ final static int BYTES_PER_POSTING = 3 * RamUsageEstimator.NUM_BYTES_INT;
final int size;
final int[] textStarts;
Index: lucene/src/java/org/apache/lucene/index/SegmentDeletes.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ lucene/src/java/org/apache/lucene/index/SegmentDeletes.java Thu Dec 09 10:10:12 2010 -0500
@@ -0,0 +1,189 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/** Holds buffered deletes, by docID, term or query for a
+ * single segment. This is used to hold buffered pending
+ * deletes against the to-be-flushed segment as well as
+ * per-segment deletes for each segment in the index. */
+
+// NOTE: we are sync'd by BufferedDeletes, ie, all access to
+// instances of this class is via sync'd methods on
+// BufferedDeletes
+class SegmentDeletes {
+
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/ Term
+ key, Integer val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Term is object w/
+ String field and String text (OBJ_HEADER + 2*POINTER).
+ We don't count Term's field since it's interned.
+ Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+ OBJ_HEADER + string.length*CHAR). Integer is
+ OBJ_HEADER + INT. */
+ final static int BYTES_PER_DEL_TERM = 8*RamUsageEstimator.NUM_BYTES_OBJ_REF + 5*RamUsageEstimator.NUM_BYTES_OBJ_HEADER + 6*RamUsageEstimator.NUM_BYTES_INT;
+
+ /* Rough logic: del docIDs are List<Integer>. Say list
+ allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
+ + int */
+ final static int BYTES_PER_DEL_DOCID = 2*RamUsageEstimator.NUM_BYTES_OBJ_REF + RamUsageEstimator.NUM_BYTES_OBJ_HEADER + RamUsageEstimator.NUM_BYTES_INT;
+
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/
+ Query key, Integer val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Query we often
+ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
+ final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJ_REF + 2*RamUsageEstimator.NUM_BYTES_OBJ_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24;
+
+ // TODO: many of the deletes stored here will map to
+ // Integer.MAX_VALUE; we could be more efficient for this
+ // case ie use a SortedSet not a SortedMap. But: Java's
+ // SortedSet impls are simply backed by a Map so we won't
+ // save anything unless we do something custom...
+ final AtomicInteger numTermDeletes = new AtomicInteger();
+ final SortedMap<Term,Integer> terms = new TreeMap<Term,Integer>();
+ final Map<Query,Integer> queries = new HashMap<Query,Integer>();
+ final List<Integer> docIDs = new ArrayList<Integer>();
+
+ public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
+
+ final AtomicLong bytesUsed = new AtomicLong();
+
+ private final static boolean VERBOSE_DELETES = false;
+
+ @Override
+ public String toString() {
+ if (VERBOSE_DELETES) {
+ return "SegmentDeletes [numTerms=" + numTermDeletes + ", terms=" + terms
+ + ", queries=" + queries + ", docIDs=" + docIDs + ", bytesUsed="
+ + bytesUsed + "]";
+ } else {
+ String s = "";
+ if (numTermDeletes.get() != 0) {
+ s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
+ }
+ if (queries.size() != 0) {
+ s += " " + queries.size() + " deleted queries";
+ }
+ if (docIDs.size() != 0) {
+ s += " " + docIDs.size() + " deleted docIDs";
+ }
+ if (bytesUsed.get() != 0) {
+ s += " bytesUsed=" + bytesUsed.get();
+ }
+
+ return s;
+ }
+ }
+
+ void update(SegmentDeletes in, boolean noLimit) {
+ numTermDeletes.addAndGet(in.numTermDeletes.get());
+ for (Map.Entry<Term,Integer> ent : in.terms.entrySet()) {
+ final Term term = ent.getKey();
+ if (!terms.containsKey(term)) {
+ // only incr bytesUsed if this term wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
+ }
+ final Integer limit;
+ if (noLimit) {
+ limit = MAX_INT;
+ } else {
+ limit = ent.getValue();
+ }
+ terms.put(term, limit);
+ }
+
+ for (Map.Entry<Query,Integer> ent : in.queries.entrySet()) {
+ final Query query = ent.getKey();
+ if (!queries.containsKey(query)) {
+ // only incr bytesUsed if this query wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
+ }
+ final Integer limit;
+ if (noLimit) {
+ limit = MAX_INT;
+ } else {
+ limit = ent.getValue();
+ }
+ queries.put(query, limit);
+ }
+
+ // docIDs never move across segments and the docIDs
+ // should already be cleared
+ }
+
+ public void addQuery(Query query, int docIDUpto) {
+ queries.put(query, docIDUpto);
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
+ }
+
+ public void addDocID(int docID) {
+ docIDs.add(Integer.valueOf(docID));
+ bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
+ }
+
+ public void addTerm(Term term, int docIDUpto) {
+ Integer current = terms.get(term);
+ if (current != null && docIDUpto < current) {
+ // Only record the new number if it's greater than the
+ // current one. This is important because if multiple
+ // threads are replacing the same doc at nearly the
+ // same time, it's possible that one thread that got a
+ // higher docID is scheduled before the other
+ // threads. If we blindly replace than we can get
+ // double-doc in the segment.
+ return;
+ }
+
+ terms.put(term, Integer.valueOf(docIDUpto));
+ numTermDeletes.incrementAndGet();
+ if (current == null) {
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
+ }
+ }
+
+ void clear() {
+ terms.clear();
+ queries.clear();
+ docIDs.clear();
+ numTermDeletes.set(0);
+ bytesUsed.set(0);
+ }
+
+ void clearDocIDs() {
+ bytesUsed.addAndGet(-docIDs.size()*BYTES_PER_DEL_DOCID);
+ docIDs.clear();
+ }
+
+ boolean any() {
+ return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
+ }
+}
Index: lucene/src/java/org/apache/lucene/index/SegmentInfo.java
--- lucene/src/java/org/apache/lucene/index/SegmentInfo.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/SegmentInfo.java Thu Dec 09 10:10:12 2010 -0500
@@ -361,6 +361,10 @@
return docStoreSegment;
}
+ public void setDocStoreSegment(String segment) {
+ docStoreSegment = segment;
+ }
+
void setDocStoreOffset(int offset) {
docStoreOffset = offset;
clearFiles();
@@ -534,6 +538,12 @@
if (docStoreOffset != -1) {
s.append("->").append(docStoreSegment);
+ if (docStoreIsCompoundFile) {
+ s.append('c');
+ } else {
+ s.append('C');
+ }
+ s.append('+').append(docStoreOffset);
}
return s.toString();
Index: lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
--- lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Thu Dec 09 10:10:12 2010 -0500
@@ -24,6 +24,7 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.RamUsageEstimator;
final class TermVectorsTermsWriterPerField extends TermsHashConsumerPerField {
@@ -298,7 +299,7 @@
@Override
int bytesPerPosting() {
- return super.bytesPerPosting() + 3 * DocumentsWriter.INT_NUM_BYTE;
+ return super.bytesPerPosting() + 3 * RamUsageEstimator.NUM_BYTES_INT;
}
}
}
Index: lucene/src/test/org/apache/lucene/TestDemo.java
--- lucene/src/test/org/apache/lucene/TestDemo.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/TestDemo.java Thu Dec 09 10:10:12 2010 -0500
@@ -50,6 +50,7 @@
// To store an index on disk, use this instead:
//Directory directory = FSDirectory.open("/tmp/testindex");
RandomIndexWriter iwriter = new RandomIndexWriter(random, directory);
+ iwriter.w.setInfoStream(VERBOSE ? System.out : null);
Document doc = new Document();
String longTerm = "longtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongterm";
String text = "This is the text to be indexed. " + longTerm;
Index: lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java
--- lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java Thu Dec 09 10:10:12 2010 -0500
@@ -84,6 +84,10 @@
lmp.setUseCompoundFile(useCompoundFiles);
lmp.setUseCompoundDocStore(useCompoundFiles);
IndexWriter writer = new IndexWriter(directory, conf);
+ if (VERBOSE) {
+ System.out.println("TEST: now build index");
+ writer.setInfoStream(System.out);
+ }
final int MAX_DOCS = 225;
Index: lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
--- lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Thu Dec 09 10:10:12 2010 -0500
@@ -57,9 +57,9 @@
isClose = true;
}
}
- if (isDoFlush && !isClose) {
+ if (isDoFlush && !isClose && random.nextBoolean()) {
hitExc = true;
- throw new IOException("now failing during flush");
+ throw new IOException(Thread.currentThread().getName() + ": now failing during flush");
}
}
}
@@ -73,12 +73,17 @@
directory.failOn(failure);
IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(2));
+ writer.setInfoStream(VERBOSE ? System.out : null);
Document doc = new Document();
Field idField = newField("id", "", Field.Store.YES, Field.Index.NOT_ANALYZED);
doc.add(idField);
int extraCount = 0;
for(int i=0;i<10;i++) {
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + i);
+ }
+
for(int j=0;j<20;j++) {
idField.setValue(Integer.toString(i*20+j));
writer.addDocument(doc);
@@ -97,10 +102,14 @@
}
extraCount++;
} catch (IOException ioe) {
+ if (VERBOSE) {
+ ioe.printStackTrace(System.out);
+ }
failure.clearDoFail();
break;
}
}
+ assertEquals(20*(i+1)+extraCount, writer.numDocs());
}
writer.close();
@@ -155,8 +164,12 @@
IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer())
.setMaxBufferedDocs(2));
+ writer.setInfoStream(VERBOSE ? System.out : null);
for(int iter=0;iter<7;iter++) {
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter);
+ }
for(int j=0;j<21;j++) {
Document doc = new Document();
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
--- lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Thu Dec 09 10:10:12 2010 -0500
@@ -1097,6 +1097,9 @@
doc.add(idField);
for(int pass=0;pass<2;pass++) {
+ if (VERBOSE) {
+ System.out.println("TEST: pass=" + pass);
+ }
IndexWriter writer = new IndexWriter(
directory,
@@ -1108,10 +1111,12 @@
// backed directory:
setMergePolicy(newLogMergePolicy(false, 10))
);
-
- //System.out.println("TEST: pass=" + pass + " cms=" + (pass >= 2));
+ writer.setInfoStream(VERBOSE ? System.out : null);
+
for(int iter=0;iter<10;iter++) {
- //System.out.println("TEST: iter=" + iter);
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter);
+ }
for(int j=0;j<199;j++) {
idField.setValue(Integer.toString(iter*201+j));
writer.addDocument(doc);
@@ -1156,8 +1161,9 @@
}
};
- if (failure.size() > 0)
+ if (failure.size() > 0) {
throw failure.get(0);
+ }
t1.start();
@@ -1170,6 +1176,7 @@
// Reopen
writer = new IndexWriter(directory, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.APPEND));
+ writer.setInfoStream(VERBOSE ? System.out : null);
}
writer.close();
}
@@ -2591,7 +2598,7 @@
Directory dir = newDirectory();
FlushCountingIndexWriter w = new FlushCountingIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, true, false)).setRAMBufferSizeMB(0.5).setMaxBufferedDocs(-1).setMaxBufferedDeleteTerms(-1));
- //w.setInfoStream(System.out);
+ w.setInfoStream(VERBOSE ? System.out : null);
Document doc = new Document();
doc.add(newField("field", "go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", Field.Store.NO, Field.Index.ANALYZED));
int num = 6 * RANDOM_MULTIPLIER;
@@ -2599,6 +2606,9 @@
int count = 0;
final boolean doIndexing = r.nextBoolean();
+ if (VERBOSE) {
+ System.out.println("TEST: iter doIndexing=" + doIndexing);
+ }
if (doIndexing) {
// Add docs until a flush is triggered
final int startFlushCount = w.flushCount;
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Thu Dec 09 10:10:12 2010 -0500
@@ -114,6 +114,9 @@
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, false)).setMaxBufferedDeleteTerms(1));
+
+ writer.setInfoStream(VERBOSE ? System.out : null);
+ writer.addDocument(new Document());
writer.deleteDocuments(new Term("foobar", "1"));
writer.deleteDocuments(new Term("foobar", "1"));
writer.deleteDocuments(new Term("foobar", "1"));
@@ -125,11 +128,14 @@
// test when delete terms only apply to ram segments
public void testRAMDeletes() throws IOException {
for(int t=0;t<2;t++) {
+ if (VERBOSE) {
+ System.out.println("TEST: t=" + t);
+ }
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, false)).setMaxBufferedDocs(4)
.setMaxBufferedDeleteTerms(4));
-
+ modifier.setInfoStream(VERBOSE ? System.out : null);
int id = 0;
int value = 100;
@@ -439,6 +445,9 @@
// Iterate w/ ever increasing free disk space:
while (!done) {
+ if (VERBOSE) {
+ System.out.println("TEST: cycle");
+ }
MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory(startDir));
dir.setPreventDoubleWrite(false);
IndexWriter modifier = new IndexWriter(dir,
@@ -448,6 +457,7 @@
.setMaxBufferedDeleteTerms(1000)
.setMergeScheduler(new ConcurrentMergeScheduler()));
((ConcurrentMergeScheduler) modifier.getConfig().getMergeScheduler()).setSuppressExceptions();
+ modifier.setInfoStream(VERBOSE ? System.out : null);
// For each disk size, first try to commit against
// dir that will hit random IOExceptions & disk
@@ -456,6 +466,9 @@
boolean success = false;
for (int x = 0; x < 2; x++) {
+ if (VERBOSE) {
+ System.out.println("TEST: x=" + x);
+ }
double rate = 0.1;
double diskRatio = ((double)diskFree) / diskUsage;
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Thu Dec 09 10:10:12 2010 -0500
@@ -51,7 +51,7 @@
IndexWriter writer;
final Random r = new java.util.Random(47);
- Throwable failure;
+ volatile Throwable failure;
public IndexerThread(int i, IndexWriter writer) {
setName("Indexer " + i);
@@ -79,6 +79,9 @@
final long stopTime = System.currentTimeMillis() + 500;
do {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": TEST: IndexerThread: cycle");
+ }
doFail.set(this);
final String id = ""+r.nextInt(50);
idField.setValue(id);
@@ -136,7 +139,7 @@
if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name);
- //new Throwable().printStackTrace(System.out);
+ new Throwable().printStackTrace(System.out);
}
throw new RuntimeException(Thread.currentThread().getName() + ": intentionally failing at " + name);
}
@@ -145,16 +148,23 @@
}
public void testRandomExceptions() throws Throwable {
+ if (VERBOSE) {
+ System.out.println("\nTEST: start testRandomExceptions");
+ }
MockDirectoryWrapper dir = newDirectory();
MockIndexWriter writer = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())
.setRAMBufferSizeMB(0.1).setMergeScheduler(new ConcurrentMergeScheduler()));
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
//writer.setMaxBufferedDocs(10);
+ if (VERBOSE) {
+ System.out.println("TEST: initial commit");
+ }
writer.commit();
- if (VERBOSE)
+ if (VERBOSE) {
writer.setInfoStream(System.out);
+ }
IndexerThread thread = new IndexerThread(0, writer);
thread.run();
@@ -163,6 +173,9 @@
fail("thread " + thread.getName() + ": hit unexpected failure");
}
+ if (VERBOSE) {
+ System.out.println("TEST: commit after thread start");
+ }
writer.commit();
try {
@@ -192,8 +205,9 @@
//writer.setMaxBufferedDocs(10);
writer.commit();
- if (VERBOSE)
+ if (VERBOSE) {
writer.setInfoStream(System.out);
+ }
final int NUM_THREADS = 4;
@@ -294,6 +308,7 @@
public void testExceptionJustBeforeFlush() throws IOException {
Directory dir = newDirectory();
MockIndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(2));
+ w.setInfoStream(VERBOSE ? System.out : null);
Document doc = new Document();
doc.add(newField("field", "a field", Field.Store.YES,
Field.Index.ANALYZED));
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Thu Dec 09 10:10:12 2010 -0500
@@ -47,29 +47,35 @@
public void testAddDocumentOnDiskFull() throws IOException {
for(int pass=0;pass<2;pass++) {
- if (VERBOSE)
+ if (VERBOSE) {
System.out.println("TEST: pass=" + pass);
+ }
boolean doAbort = pass == 1;
long diskFree = 200;
while(true) {
- if (VERBOSE)
+ if (VERBOSE) {
System.out.println("TEST: cycle: diskFree=" + diskFree);
+ }
MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory());
dir.setMaxSizeInBytes(diskFree);
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()));
writer.setInfoStream(VERBOSE ? System.out : null);
MergeScheduler ms = writer.getConfig().getMergeScheduler();
- if (ms instanceof ConcurrentMergeScheduler)
+ if (ms instanceof ConcurrentMergeScheduler) {
// This test intentionally produces exceptions
// in the threads that CMS launches; we don't
// want to pollute test output with these.
((ConcurrentMergeScheduler) ms).setSuppressExceptions();
+ }
boolean hitError = false;
try {
for(int i=0;i<200;i++) {
addDoc(writer);
}
+ if (VERBOSE) {
+ System.out.println("TEST: done adding docs; now commit");
+ }
writer.commit();
} catch (IOException e) {
if (VERBOSE) {
@@ -81,13 +87,19 @@
if (hitError) {
if (doAbort) {
+ if (VERBOSE) {
+ System.out.println("TEST: now rollback");
+ }
writer.rollback();
} else {
try {
+ if (VERBOSE) {
+ System.out.println("TEST: now close");
+ }
writer.close();
} catch (IOException e) {
if (VERBOSE) {
- System.out.println("TEST: exception on close");
+ System.out.println("TEST: exception on close; retry w/ no disk space limit");
e.printStackTrace(System.out);
}
dir.setMaxSizeInBytes(0);
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Dec 09 10:10:12 2010 -0500
@@ -106,6 +106,9 @@
int NUM_THREADS = 3;
for(int iter=0;iter<10;iter++) {
+ if (VERBOSE) {
+ System.out.println("\nTEST: iter=" + iter);
+ }
MockDirectoryWrapper dir = newDirectory();
IndexWriter writer = new IndexWriter(
dir,
@@ -116,6 +119,7 @@
);
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
dir.setMaxSizeInBytes(4*1024+20*iter);
+ writer.setInfoStream(VERBOSE ? System.out : null);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
Index: lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java Thu Dec 09 10:10:12 2010 -0500
@@ -0,0 +1,302 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.TermsEnum.SeekStatus;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.Version;
+
+public class TestPerSegmentDeletes extends LuceneTestCase {
+ public void testDeletes1() throws Exception {
+ //IndexWriter.debug2 = System.out;
+ Directory dir = new MockDirectoryWrapper(new Random(), new RAMDirectory());
+ IndexWriterConfig iwc = new IndexWriterConfig(Version.LUCENE_CURRENT,
+ new MockAnalyzer());
+ iwc.setMergeScheduler(new SerialMergeScheduler());
+ iwc.setMaxBufferedDocs(5000);
+ iwc.setRAMBufferSizeMB(100);
+ RangeMergePolicy fsmp = new RangeMergePolicy(false);
+ iwc.setMergePolicy(fsmp);
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ Document doc = new Document();
+ for (int x = 0; x < 5; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "1", 2));
+ //System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+ //System.out.println("commit1");
+ writer.commit();
+ assertEquals(1, writer.segmentInfos.size());
+ for (int x = 5; x < 10; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "2", 2));
+ //System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+ //System.out.println("commit2");
+ writer.commit();
+ assertEquals(2, writer.segmentInfos.size());
+
+ for (int x = 10; x < 15; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "3", 2));
+ //System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+
+ writer.deleteDocuments(new Term("id", "1"));
+
+ writer.deleteDocuments(new Term("id", "11"));
+
+ // flushing without applying deletes means
+ // there will still be deletes in the segment infos
+ writer.flush(false, false, false);
+ assertTrue(writer.bufferedDeletes.any());
+
+ // get reader flushes pending deletes
+ // so there should not be anymore
+ IndexReader r1 = writer.getReader();
+ assertFalse(writer.bufferedDeletes.any());
+ r1.close();
+
+ // delete id:2 from the first segment
+ // merge segments 0 and 1
+ // which should apply the delete id:2
+ writer.deleteDocuments(new Term("id", "2"));
+ writer.flush(false, false, false);
+ fsmp.doMerge = true;
+ fsmp.start = 0;
+ fsmp.length = 2;
+ writer.maybeMerge();
+
+ assertEquals(2, writer.segmentInfos.size());
+
+ // id:2 shouldn't exist anymore because
+ // it's been applied in the merge and now it's gone
+ IndexReader r2 = writer.getReader();
+ int[] id2docs = toDocsArray(new Term("id", "2"), null, r2);
+ assertTrue(id2docs == null);
+ r2.close();
+
+ /**
+ // added docs are in the ram buffer
+ for (int x = 15; x < 20; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "4", 2));
+ System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+ assertTrue(writer.numRamDocs() > 0);
+ // delete from the ram buffer
+ writer.deleteDocuments(new Term("id", Integer.toString(13)));
+
+ Term id3 = new Term("id", Integer.toString(3));
+
+ // delete from the 1st segment
+ writer.deleteDocuments(id3);
+
+ assertTrue(writer.numRamDocs() > 0);
+
+ //System.out
+ // .println("segdels1:" + writer.docWriter.deletesToString());
+
+ //assertTrue(writer.docWriter.segmentDeletes.size() > 0);
+
+ // we cause a merge to happen
+ fsmp.doMerge = true;
+ fsmp.start = 0;
+ fsmp.length = 2;
+ System.out.println("maybeMerge "+writer.segmentInfos);
+
+ SegmentInfo info0 = writer.segmentInfos.get(0);
+ SegmentInfo info1 = writer.segmentInfos.get(1);
+
+ writer.maybeMerge();
+ System.out.println("maybeMerge after "+writer.segmentInfos);
+ // there should be docs in RAM
+ assertTrue(writer.numRamDocs() > 0);
+
+ // assert we've merged the 1 and 2 segments
+ // and still have a segment leftover == 2
+ assertEquals(2, writer.segmentInfos.size());
+ assertFalse(segThere(info0, writer.segmentInfos));
+ assertFalse(segThere(info1, writer.segmentInfos));
+
+ //System.out.println("segdels2:" + writer.docWriter.deletesToString());
+
+ //assertTrue(writer.docWriter.segmentDeletes.size() > 0);
+
+ IndexReader r = writer.getReader();
+ IndexReader r1 = r.getSequentialSubReaders()[0];
+ printDelDocs(r1.getDeletedDocs());
+ int[] docs = toDocsArray(id3, null, r);
+ System.out.println("id3 docs:"+Arrays.toString(docs));
+ // there shouldn't be any docs for id:3
+ assertTrue(docs == null);
+ r.close();
+
+ part2(writer, fsmp);
+ **/
+ // System.out.println("segdels2:"+writer.docWriter.segmentDeletes.toString());
+ //System.out.println("close");
+ writer.close();
+ dir.close();
+ }
+
+ /**
+ static boolean hasPendingDeletes(SegmentInfos infos) {
+ for (SegmentInfo info : infos) {
+ if (info.deletes.any()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ **/
+ void part2(IndexWriter writer, RangeMergePolicy fsmp) throws Exception {
+ for (int x = 20; x < 25; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "5", 2));
+ //System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+ writer.flush(false, true, false);
+ for (int x = 25; x < 30; x++) {
+ writer.addDocument(TestIndexWriterReader.createDocument(x, "5", 2));
+ //System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
+ }
+ writer.flush(false, true, false);
+
+ System.out.println("infos3:"+writer.segmentInfos);
+
+ Term delterm = new Term("id", "8");
+ writer.deleteDocuments(delterm);
+ //System.out.println("segdels3:" + writer.docWriter.deletesToString());
+
+ fsmp.doMerge = true;
+ fsmp.start = 1;
+ fsmp.length = 2;
+ writer.maybeMerge();
+
+ // deletes for info1, the newly created segment from the
+ // merge should have no deletes because they were applied in
+ // the merge
+ SegmentInfo info1 = writer.segmentInfos.get(1);
+ //assertFalse(exists(info1, writer.docWriter.segmentDeletes));
+
+ System.out.println("infos4:"+writer.segmentInfos);
+ //System.out.println("segdels4:" + writer.docWriter.deletesToString());
+ }
+
+ boolean segThere(SegmentInfo info, SegmentInfos infos) {
+ for (SegmentInfo si : infos) {
+ if (si.name.equals(info.name)) return true;
+ }
+ return false;
+ }
+
+ public static void printDelDocs(Bits bits) {
+ if (bits == null) return;
+ for (int x = 0; x < bits.length(); x++) {
+ System.out.println(x + ":" + bits.get(x));
+ }
+ }
+
+ public static int[] toDocsArray(Term term, Bits bits, IndexReader reader)
+ throws IOException {
+ Fields fields = MultiFields.getFields(reader);
+ Terms cterms = fields.terms(term.field);
+ TermsEnum ctermsEnum = cterms.iterator();
+ SeekStatus ss = ctermsEnum.seek(new BytesRef(term.text()), false);
+ if (ss.equals(SeekStatus.FOUND)) {
+ DocsEnum docsEnum = ctermsEnum.docs(bits, null);
+ return toArray(docsEnum);
+ }
+ return null;
+ }
+
+ public static int[] toArray(DocsEnum docsEnum) throws IOException {
+ List<Integer> docs = new ArrayList<Integer>();
+ while (docsEnum.nextDoc() != DocsEnum.NO_MORE_DOCS) {
+ int docID = docsEnum.docID();
+ docs.add(docID);
+ }
+ return ArrayUtil.toIntArray(docs);
+ }
+
+ public class RangeMergePolicy extends MergePolicy {
+ boolean doMerge = false;
+ int start;
+ int length;
+
+ private final boolean useCompoundFile;
+
+ private RangeMergePolicy(boolean useCompoundFile) {
+ this.useCompoundFile = useCompoundFile;
+ }
+
+ @Override
+ public void close() {}
+
+ public MergeSpecification findMerges(SegmentInfos segmentInfos)
+ throws CorruptIndexException, IOException {
+ MergeSpecification ms = new MergeSpecification();
+ if (doMerge) {
+ SegmentInfos mergeInfos = new SegmentInfos();
+ for (int x=start; x < (start+length); x++) {
+ mergeInfos.add(segmentInfos.get(x));
+ }
+ OneMerge om = new OneMerge(mergeInfos);
+ ms.add(om);
+ doMerge = false;
+ return ms;
+ }
+ return null;
+ }
+
+ @Override
+ public MergeSpecification findMergesForOptimize(SegmentInfos segmentInfos,
+ int maxSegmentCount, Set<SegmentInfo> segmentsToOptimize)
+ throws CorruptIndexException, IOException {
+ return null;
+ }
+
+ @Override
+ public MergeSpecification findMergesToExpungeDeletes(
+ SegmentInfos segmentInfos) throws CorruptIndexException, IOException {
+ return null;
+ }
+
+ @Override
+ public boolean useCompoundDocStore(SegmentInfos segments) {
+ return useCompoundFile;
+ }
+
+ @Override
+ public boolean useCompoundFile(SegmentInfos segments, SegmentInfo newSegment) {
+ return useCompoundFile;
+ }
+ }
+}
Index: lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
--- lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java Thu Dec 09 10:10:12 2010 -0500
@@ -96,6 +96,9 @@
int num = 3 * RANDOM_MULTIPLIER;
for (int i = 0; i < num; i++) { // increase iterations for better testing
+ if (VERBOSE) {
+ System.out.println("\n\nTEST: top iter=" + i);
+ }
sameFieldOrder=random.nextBoolean();
mergeFactor=random.nextInt(3)+2;
maxBufferedDocs=random.nextInt(3)+2;
@@ -108,10 +111,17 @@
int range=random.nextInt(20)+1;
Directory dir1 = newDirectory();
Directory dir2 = newDirectory();
+ if (VERBOSE) {
+ System.out.println(" nThreads=" + nThreads + " iter=" + iter + " range=" + range + " doPooling=" + doReaderPooling + " maxThreadStates=" + maxThreadStates + " sameFieldOrder=" + sameFieldOrder + " mergeFactor=" + mergeFactor);
+ }
Map<String,Document> docs = indexRandom(nThreads, iter, range, dir1, maxThreadStates, doReaderPooling);
- //System.out.println("TEST: index serial");
+ if (VERBOSE) {
+ System.out.println("TEST: index serial");
+ }
indexSerial(random, docs, dir2);
- //System.out.println("TEST: verify");
+ if (VERBOSE) {
+ System.out.println("TEST: verify");
+ }
verifyEquals(dir1, dir2, "id");
dir1.close();
dir2.close();
@@ -141,6 +151,7 @@
IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE).setRAMBufferSizeMB(
0.1).setMaxBufferedDocs(maxBufferedDocs));
+ w.setInfoStream(VERBOSE ? System.out : null);
w.commit();
LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
lmp.setUseCompoundFile(false);
@@ -191,10 +202,14 @@
boolean doReaderPooling) throws IOException, InterruptedException {
Map<String,Document> docs = new HashMap<String,Document>();
for(int iter=0;iter<3;iter++) {
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter);
+ }
IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE)
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setMaxThreadStates(maxThreadStates)
.setReaderPooling(doReaderPooling));
+ w.setInfoStream(VERBOSE ? System.out : null);
LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
lmp.setUseCompoundFile(false);
lmp.setUseCompoundDocStore(false);
@@ -273,9 +288,33 @@
r2.close();
}
+ private static void printDocs(IndexReader r) throws Throwable {
+ IndexReader[] subs = r.getSequentialSubReaders();
+ for(IndexReader sub : subs) {
+ Bits delDocs = sub.getDeletedDocs();
+ System.out.println(" " + ((SegmentReader) sub).getSegmentInfo());
+ for(int docID=0;docID<sub.maxDoc();docID++) {
+ Document doc = sub.document(docID);
+ if (delDocs == null || !delDocs.get(docID)) {
+ System.out.println(" docID=" + docID + " id:" + doc.get("id"));
+ } else {
+ System.out.println(" DEL docID=" + docID + " id:" + doc.get("id"));
+ }
+ }
+ }
+ }
+
public static void verifyEquals(IndexReader r1, IndexReader r2, String idField) throws Throwable {
- assertEquals(r1.numDocs(), r2.numDocs());
+ if (VERBOSE) {
+ System.out.println("\nr1 docs:");
+ printDocs(r1);
+ System.out.println("\nr2 docs:");
+ printDocs(r2);
+ }
+ if (r1.numDocs() != r2.numDocs()) {
+ assert false: "r1.numDocs()=" + r1.numDocs() + " vs r2.numDocs()=" + r2.numDocs();
+ }
boolean hasDeletes = !(r1.maxDoc()==r2.maxDoc() && r1.numDocs()==r1.maxDoc());
int[] r2r1 = new int[r2.maxDoc()]; // r2 id to r1 id mapping
@@ -693,19 +732,28 @@
for (int i=0; i<fields.size(); i++) {
d.add(fields.get(i));
}
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": indexing id:" + idString);
+ }
w.updateDocument(idTerm.createTerm(idString), d);
- // System.out.println("indexing "+d);
+ //System.out.println(Thread.currentThread().getName() + ": indexing "+d);
docs.put(idString, d);
}
public void deleteDoc() throws IOException {
String idString = getIdString();
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": del id:" + idString);
+ }
w.deleteDocuments(idTerm.createTerm(idString));
docs.remove(idString);
}
public void deleteByQuery() throws IOException {
String idString = getIdString();
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": del query id:" + idString);
+ }
w.deleteDocuments(new TermQuery(idTerm.createTerm(idString)));
docs.remove(idString);
}
Index: lucene/src/test/org/apache/lucene/search/TestTermVectors.java
--- lucene/src/test/org/apache/lucene/search/TestTermVectors.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/search/TestTermVectors.java Thu Dec 09 10:10:12 2010 -0500
@@ -353,12 +353,19 @@
RandomIndexWriter writer = new RandomIndexWriter(random, directory,
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.SIMPLE, true))
.setOpenMode(OpenMode.CREATE));
+ writer.w.setInfoStream(VERBOSE ? System.out : null);
+ if (VERBOSE) {
+ System.out.println("TEST: now add non-vectors");
+ }
for (int i = 0; i < 100; i++) {
Document doc = new Document();
doc.add(new Field("field", English.intToEnglish(i),
Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.NO));
writer.addDocument(doc);
}
+ if (VERBOSE) {
+ System.out.println("TEST: now add vectors");
+ }
for(int i=0;i<10;i++) {
Document doc = new Document();
doc.add(new Field("field", English.intToEnglish(100+i),
@@ -366,6 +373,9 @@
writer.addDocument(doc);
}
+ if (VERBOSE) {
+ System.out.println("TEST: now getReader");
+ }
IndexReader reader = writer.getReader();
writer.close();
searcher = new IndexSearcher(reader);
@@ -374,6 +384,7 @@
ScoreDoc[] hits = searcher.search(query, null, 1000).scoreDocs;
assertEquals(10, hits.length);
for (int i = 0; i < hits.length; i++) {
+
TermFreqVector [] vector = searcher.reader.getTermFreqVectors(hits[i].doc);
assertTrue(vector != null);
assertTrue(vector.length == 1);
Index: lucene/src/test/org/apache/lucene/store/MockIndexOutputWrapper.java
--- lucene/src/test/org/apache/lucene/store/MockIndexOutputWrapper.java Thu Dec 09 05:37:58 2010 -0500
+++ lucene/src/test/org/apache/lucene/store/MockIndexOutputWrapper.java Thu Dec 09 10:10:12 2010 -0500
@@ -19,6 +19,8 @@
import java.io.IOException;
+import org.apache.lucene.util.LuceneTestCase;
+
/**
* Used by MockRAMDirectory to create an output stream that
* will throw an IOException on fake disk full, track max
@@ -102,6 +104,9 @@
message += "; wrote " + freeSpace + " of " + len + " bytes";
}
message += ")";
+ if (LuceneTestCase.VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": MDW: now throw fake disk full");
+ }
throw new IOException(message);
} else {
if (dir.randomState.nextBoolean()) {