| Index: src/java/org/apache/lucene/index/DocumentsWriter.java |
| =================================================================== |
| --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 619194) |
| +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) |
| @@ -150,7 +150,7 @@ |
| // pause (eg to flush) |
| private boolean flushPending; // True when a thread has decided to flush |
| private boolean bufferIsFull; // True when it's time to write segment |
| - private boolean aborting; // True while abort is running |
| + private int abortCount; // Non-zero while abort is pending or running |
| |
| private PrintStream infoStream; |
| |
| @@ -323,7 +323,7 @@ |
| } |
| |
| synchronized void setAborting() { |
| - aborting = true; |
| + abortCount++; |
| } |
| |
| /** Called if we hit an exception when adding docs, |
| @@ -338,7 +338,7 @@ |
| // unwinding the un-synchronized stack, no thread grabs |
| // the corrupt ThreadState that hit the aborting |
| // exception: |
| - assert ae == null || aborting; |
| + assert ae == null || abortCount>0; |
| |
| try { |
| |
| @@ -361,18 +361,28 @@ |
| bufferedDeleteDocIDs.clear(); |
| numBufferedDeleteTerms = 0; |
| |
| - abortedFiles = files(); |
| + try { |
| + abortedFiles = files(); |
| + } catch (Throwable t) { |
| + abortedFiles = null; |
| + } |
| |
| - // Discard pending norms: |
| - final int numField = fieldInfos.size(); |
| - for (int i=0;i<numField;i++) { |
| - FieldInfo fi = fieldInfos.fieldInfo(i); |
| - if (fi.isIndexed && !fi.omitNorms) { |
| - BufferedNorms n = norms[i]; |
| - if (n != null) { |
| - n.out.reset(); |
| - n.reset(); |
| + docStoreSegment = null; |
| + numDocsInStore = 0; |
| + docStoreOffset = 0; |
| + files = null; |
| + |
| + // Clear vectors & fields from ThreadStates |
| + for(int i=0;i<threadStates.length;i++) { |
| + ThreadState state = threadStates[i]; |
| + state.tvfLocal.reset(); |
| + state.fdtLocal.reset(); |
| + if (state.localFieldsWriter != null) { |
| + try { |
| + state.localFieldsWriter.close(); |
| + } catch (Throwable t) { |
| } |
| + state.localFieldsWriter = null; |
| } |
| } |
| |
| @@ -380,21 +390,21 @@ |
| if (tvx != null) { |
| try { |
| tvx.close(); |
| - } catch (IOException ioe) { |
| + } catch (Throwable t) { |
| } |
| tvx = null; |
| } |
| if (tvd != null) { |
| try { |
| tvd.close(); |
| - } catch (IOException ioe) { |
| + } catch (Throwable t) { |
| } |
| tvd = null; |
| } |
| if (tvf != null) { |
| try { |
| tvf.close(); |
| - } catch (IOException ioe) { |
| + } catch (Throwable t) { |
| } |
| tvf = null; |
| } |
| @@ -403,30 +413,28 @@ |
| if (fieldsWriter != null) { |
| try { |
| fieldsWriter.close(); |
| - } catch (IOException ioe) { |
| + } catch (Throwable t) { |
| } |
| fieldsWriter = null; |
| } |
| |
| - // Clear vectors & fields from ThreadStates |
| - for(int i=0;i<threadStates.length;i++) { |
| - ThreadState state = threadStates[i]; |
| - if (state.localFieldsWriter != null) { |
| - state.localFieldsWriter.close(); |
| - state.localFieldsWriter = null; |
| + // Discard pending norms: |
| + final int numField = fieldInfos.size(); |
| + for (int i=0;i<numField;i++) { |
| + FieldInfo fi = fieldInfos.fieldInfo(i); |
| + if (fi.isIndexed && !fi.omitNorms) { |
| + BufferedNorms n = norms[i]; |
| + if (n != null) |
| + try { |
| + n.reset(); |
| + } catch (Throwable t) { |
| + } |
| } |
| - state.tvfLocal.reset(); |
| - state.fdtLocal.reset(); |
| } |
| |
| // Reset all postings data |
| resetPostingsData(); |
| |
| - docStoreSegment = null; |
| - numDocsInStore = 0; |
| - docStoreOffset = 0; |
| - files = null; |
| - |
| } finally { |
| resumeAllThreads(); |
| } |
| @@ -445,7 +453,7 @@ |
| assert false: "unknown exception: " + t; |
| } |
| } finally { |
| - aborting = false; |
| + abortCount--; |
| notifyAll(); |
| } |
| } |
| @@ -454,20 +462,20 @@ |
| private void resetPostingsData() throws IOException { |
| // All ThreadStates should be idle when we are called |
| assert allThreadsIdle(); |
| - for(int i=0;i<threadStates.length;i++) { |
| - threadStates[i].resetPostings(); |
| - threadStates[i].numThreads = 0; |
| - } |
| threadBindings.clear(); |
| - numBytesUsed = 0; |
| - balanceRAM(); |
| - bufferIsFull = false; |
| - flushPending = false; |
| segment = null; |
| numDocsInRAM = 0; |
| nextDocID = 0; |
| nextWriteDocID = 0; |
| files = null; |
| + balanceRAM(); |
| + bufferIsFull = false; |
| + flushPending = false; |
| + for(int i=0;i<threadStates.length;i++) { |
| + threadStates[i].numThreads = 0; |
| + threadStates[i].resetPostings(); |
| + } |
| + numBytesUsed = 0; |
| } |
| |
| // Returns true if an abort is in progress |
| @@ -480,7 +488,7 @@ |
| Thread.currentThread().interrupt(); |
| } |
| } |
| - return aborting; |
| + return abortCount > 0; |
| } |
| |
| synchronized void resumeAllThreads() { |
| @@ -627,19 +635,20 @@ |
| /** Clear the postings hash and return objects back to |
| * shared pool */ |
| public void resetPostings() throws IOException { |
| + fieldGen = 0; |
| + maxPostingsVectors = 0; |
| + doFlushAfter = false; |
| if (localFieldsWriter != null) { |
| localFieldsWriter.close(); |
| localFieldsWriter = null; |
| } |
| - fieldGen = 0; |
| - maxPostingsVectors = 0; |
| - doFlushAfter = false; |
| postingsPool.reset(); |
| charPool.reset(); |
| recyclePostings(postingsFreeList, postingsFreeCount); |
| postingsFreeCount = 0; |
| for(int i=0;i<numAllFieldData;i++) { |
| FieldData fp = allFieldDataArray[i]; |
| + fp.lastGen = -1; |
| if (fp.numPostings > 0) |
| fp.resetPostingArrays(); |
| } |
| @@ -1381,6 +1390,8 @@ |
| final int limit = fieldCount; |
| final Fieldable[] docFieldsFinal = docFields; |
| |
| + boolean doWriteVectors = true; |
| + |
| // Walk through all occurrences in this doc for this |
| // field: |
| try { |
| @@ -1409,22 +1420,30 @@ |
| |
| docFieldsFinal[j] = null; |
| } |
| + } catch (AbortException ae) { |
| + doWriteVectors = false; |
| + throw ae; |
| } finally { |
| if (postingsVectorsUpto > 0) { |
| - // Add term vectors for this field |
| - boolean success = false; |
| try { |
| - writeVectors(fieldInfo); |
| - success = true; |
| + if (doWriteVectors) { |
| + // Add term vectors for this field |
| + boolean success = false; |
| + try { |
| + writeVectors(fieldInfo); |
| + success = true; |
| + } finally { |
| + if (!success) { |
| + // If we hit an exception inside |
| + // writeVectors, the contents of tvfLocal |
| + // can be corrupt, so we must discard all |
| + // term vectors for this document: |
| + numVectorFields = 0; |
| + tvfLocal.reset(); |
| + } |
| + } |
| + } |
| } finally { |
| - if (!success) { |
| - // If we hit an exception inside |
| - // writeVectors, the contents of tvfLocal |
| - // can be corrupt, so we must discard all |
| - // term vectors for this document: |
| - numVectorFields = 0; |
| - tvfLocal.reset(); |
| - } |
| if (postingsVectorsUpto > maxPostingsVectors) |
| maxPostingsVectors = postingsVectorsUpto; |
| postingsVectorsUpto = 0; |
| @@ -1665,8 +1684,8 @@ |
| |
| // Refill? |
| if (0 == postingsFreeCount) { |
| + getPostings(postingsFreeList); |
| postingsFreeCount = postingsFreeList.length; |
| - getPostings(postingsFreeList); |
| } |
| |
| final int textLen1 = 1+tokenTextLen; |
| @@ -1771,7 +1790,7 @@ |
| * occupied) or too large (< 20% occupied). */ |
| void rehashPostings(final int newSize) { |
| |
| - postingsHashMask = newSize-1; |
| + final int newMask = newSize-1; |
| |
| Posting[] newHash = new Posting[newSize]; |
| for(int i=0;i<postingsHashSize;i++) { |
| @@ -1786,19 +1805,20 @@ |
| while (pos > start) |
| code = (code*31) + text[--pos]; |
| |
| - int hashPos = code & postingsHashMask; |
| + int hashPos = code & newMask; |
| assert hashPos >= 0; |
| if (newHash[hashPos] != null) { |
| final int inc = ((code>>8)+code)|1; |
| do { |
| code += inc; |
| - hashPos = code & postingsHashMask; |
| + hashPos = code & newMask; |
| } while (newHash[hashPos] != null); |
| } |
| newHash[hashPos] = p0; |
| } |
| } |
| |
| + postingsHashMask = newMask; |
| postingsHash = newHash; |
| postingsHashSize = newSize; |
| postingsHashHalfSize = newSize >> 1; |
| @@ -2333,7 +2353,7 @@ |
| // Next, wait until my thread state is idle (in case |
| // it's shared with other threads) and for threads to |
| // not be paused nor a flush pending: |
| - while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting)) |
| + while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount > 0)) |
| try { |
| wait(); |
| } catch (InterruptedException e) { |
| @@ -2555,7 +2575,7 @@ |
| /** Does the synchronized work to finish/flush the |
| * inverted document. */ |
| private synchronized void finishDocument(ThreadState state) throws IOException, AbortException { |
| - if (aborting) { |
| + if (abortCount > 0) { |
| // Forcefully idle this threadstate -- its state will |
| // be reset by abort() |
| state.isIdle = true; |
| @@ -3001,9 +3021,10 @@ |
| // Holds free pool of Posting instances |
| private Posting[] postingsFreeList; |
| private int postingsFreeCount; |
| + private int postingsAllocCount; |
| |
| /* Allocate more Postings from shared pool */ |
| - private synchronized void getPostings(Posting[] postings) { |
| + synchronized void getPostings(Posting[] postings) { |
| numBytesUsed += postings.length * POSTING_NUM_BYTE; |
| final int numToCopy; |
| if (postingsFreeCount < postings.length) |
| @@ -3017,25 +3038,27 @@ |
| |
| // Directly allocate the remainder if any |
| if (numToCopy < postings.length) { |
| - numBytesAlloc += (postings.length - numToCopy) * POSTING_NUM_BYTE; |
| + final int extra = postings.length - numToCopy; |
| + final int newPostingsAllocCount = postingsAllocCount + extra; |
| + if (newPostingsAllocCount > postingsFreeList.length) |
| + postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)]; |
| + |
| balanceRAM(); |
| - for(int i=numToCopy;i<postings.length;i++) |
| + for(int i=numToCopy;i<postings.length;i++) { |
| postings[i] = new Posting(); |
| + numBytesAlloc += POSTING_NUM_BYTE; |
| + postingsAllocCount++; |
| + } |
| } |
| } |
| |
| - private synchronized void recyclePostings(Posting[] postings, int numPostings) { |
| + synchronized void recyclePostings(Posting[] postings, int numPostings) { |
| // Move all Postings from this ThreadState back to our |
| - // free list |
| - if (postingsFreeCount + numPostings > postingsFreeList.length) { |
| - final int newSize = (int) (1.25 * (postingsFreeCount + numPostings)); |
| - Posting[] newArray = new Posting[newSize]; |
| - System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount); |
| - postingsFreeList = newArray; |
| - } |
| + // free list. We pre-allocated this array while we were |
| + // creating Postings to make sure it's large enough |
| + assert postingsFreeCount + numPostings <= postingsFreeList.length; |
| System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings); |
| postingsFreeCount += numPostings; |
| - numBytesUsed -= numPostings * POSTING_NUM_BYTE; |
| } |
| |
| /* Initial chunks size of the shared byte[] blocks used to |
| @@ -3065,7 +3088,6 @@ |
| synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) { |
| for(int i=start;i<end;i++) |
| freeByteBlocks.add(blocks[i]); |
| - numBytesUsed -= (end-start) * BYTE_BLOCK_SIZE; |
| } |
| |
| /* Initial chunk size of the shared char[] blocks used to |
| @@ -3096,7 +3118,6 @@ |
| synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) { |
| for(int i=0;i<numBlocks;i++) |
| freeCharBlocks.add(blocks[i]); |
| - numBytesUsed -= numBlocks * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; |
| } |
| |
| String toMB(long v) { |
| @@ -3177,6 +3198,7 @@ |
| numToFree = postingsFreeCount; |
| Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null); |
| postingsFreeCount -= numToFree; |
| + postingsAllocCount -= numToFree; |
| numBytesAlloc -= numToFree * POSTING_NUM_BYTE; |
| } |
| |