| Index: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java |
| =================================================================== |
| --- lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (révision 1461346) |
| +++ lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (copie de travail) |
| @@ -25,6 +25,7 @@ |
| |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.MergeInfo; |
| +import org.apache.lucene.util.FixedBitSet; |
| import org.apache.lucene.util.SetOnce.AlreadySetException; |
| import org.apache.lucene.util.SetOnce; |
| |
| @@ -58,7 +59,29 @@ |
| */ |
| |
| public abstract class MergePolicy implements java.io.Closeable, Cloneable { |
| - |
| + |
| + /** A map of doc IDs. */ |
| + public static abstract class DocMap { |
| + /** Return the new doc ID according to its old value. */ |
| + public abstract int map(int old); |
| + |
| + /** Useful from an assert. */ |
| + boolean isConsistent(int maxDoc) { |
| + final FixedBitSet targets = new FixedBitSet(maxDoc); |
| + for (int i = 0; i < maxDoc; ++i) { |
| + final int target = map(i); |
| + if (target < 0 || target >= maxDoc) { |
| + assert false : "out of range: " + target + " not in [0-" + maxDoc + "["; |
| + return false; |
| + } else if (targets.get(target)) { |
| + assert false : target + " is already taken (" + i + ")"; |
| + return false; |
| + } |
| + } |
| + return true; |
| + } |
| + } |
| + |
| /** OneMerge provides the information necessary to perform |
| * an individual primitive merge operation, resulting in |
| * a single new segment. The merge spec includes the |
| @@ -105,9 +128,12 @@ |
| totalDocCount = count; |
| } |
| |
| - /** Get the list of readers to merge. Note that this list does not |
| + /** Expert: Get the list of readers to merge. Note that this list does not |
| * necessarily match the list of segments to merge and should only be used |
| - * to feed SegmentMerger to initialize a merge. */ |
| + * to feed SegmentMerger to initialize a merge. When a {@link OneMerge} |
| + * reorders doc IDs, it must override {@link #getDocMap} too so that |
| + * deletes that happened during the merge can be applied to the newly |
| + * merged segment. */ |
| public List<AtomicReader> getMergeReaders() throws IOException { |
| if (readers == null) { |
| throw new IllegalStateException("IndexWriter has not initialized readers from the segment infos yet"); |
| @@ -121,6 +147,20 @@ |
| return Collections.unmodifiableList(readers); |
| } |
| |
| + /** Expert: If {@link #getMergeReaders()} reorders document IDs, this method |
| + * must be overridden to return a mapping from the <i>natural</i> doc ID |
| + * (the doc ID that would result from a natural merge) to the actual doc |
| + * ID. This mapping is used to apply deletions that happened during the |
| + * merge to the new segment. */ |
| + public DocMap getDocMap(MergeState mergeState) { |
| + return new DocMap() { |
| + @Override |
| + public int map(int docID) { |
| + return docID; |
| + } |
| + }; |
| + } |
| + |
| /** Record that an exception occurred while executing |
| * this merge */ |
| synchronized void setException(Throwable error) { |
| Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java |
| =================================================================== |
| --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (révision 1461346) |
| +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (copie de travail) |
| @@ -2975,7 +2975,7 @@ |
| * saves the resulting deletes file (incrementing the |
| * delete generation for merge.info). If no deletes were |
| * flushed, no new deletes file is saved. */ |
| - synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException { |
| + synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { |
| |
| assert testPoint("startCommitMergeDeletes"); |
| |
| @@ -2992,6 +2992,7 @@ |
| |
| // Lazy init (only when we find a delete to carry over): |
| ReadersAndLiveDocs mergedDeletes = null; |
| + MergePolicy.DocMap docMap = null; |
| |
| for(int i=0; i < sourceSegments.size(); i++) { |
| SegmentInfoPerCommit info = sourceSegments.get(i); |
| @@ -3037,8 +3038,10 @@ |
| if (mergedDeletes == null) { |
| mergedDeletes = readerPool.get(merge.info, true); |
| mergedDeletes.initWritableLiveDocs(); |
| + docMap = merge.getDocMap(mergeState); |
| + assert docMap.isConsistent(merge.info.info.getDocCount()); |
| } |
| - mergedDeletes.delete(docUpto); |
| + mergedDeletes.delete(docMap.map(docUpto)); |
| } |
| docUpto++; |
| } |
| @@ -3055,8 +3058,10 @@ |
| if (mergedDeletes == null) { |
| mergedDeletes = readerPool.get(merge.info, true); |
| mergedDeletes.initWritableLiveDocs(); |
| + docMap = merge.getDocMap(mergeState); |
| + assert docMap.isConsistent(merge.info.info.getDocCount()); |
| } |
| - mergedDeletes.delete(docUpto); |
| + mergedDeletes.delete(docMap.map(docUpto)); |
| } |
| docUpto++; |
| } |
| @@ -3081,7 +3086,7 @@ |
| return mergedDeletes; |
| } |
| |
| - synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException { |
| + synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { |
| |
| assert testPoint("startCommitMerge"); |
| |
| @@ -3109,7 +3114,7 @@ |
| return false; |
| } |
| |
| - final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge); |
| + final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState); |
| |
| assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0; |
| |
| @@ -3780,7 +3785,7 @@ |
| |
| // Force READ context because we merge deletes onto |
| // this reader: |
| - if (!commitMerge(merge)) { |
| + if (!commitMerge(merge, mergeState)) { |
| // commitMerge will return false if this merge was aborted |
| return 0; |
| } |
| Index: lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java |
| =================================================================== |
| --- lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (révision 1461346) |
| +++ lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (copie de travail) |
| @@ -18,7 +18,11 @@ |
| */ |
| |
| import java.io.IOException; |
| +import java.util.ArrayList; |
| +import java.util.HashSet; |
| +import java.util.List; |
| import java.util.Random; |
| +import java.util.Set; |
| |
| import org.apache.lucene.analysis.MockAnalyzer; |
| import org.apache.lucene.document.Document; |
| @@ -29,24 +33,28 @@ |
| import org.apache.lucene.index.DirectoryReader; |
| import org.apache.lucene.index.IndexReader; |
| import org.apache.lucene.index.IndexWriterConfig; |
| +import org.apache.lucene.index.LogMergePolicy; |
| +import org.apache.lucene.index.MergePolicy; |
| import org.apache.lucene.index.NumericDocValues; |
| import org.apache.lucene.index.RandomIndexWriter; |
| -import org.apache.lucene.index.SerialMergeScheduler; |
| import org.apache.lucene.index.SlowCompositeReaderWrapper; |
| import org.apache.lucene.index.Term; |
| +import org.apache.lucene.index.TieredMergePolicy; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util._TestUtil; |
| |
| +import com.carrotsearch.randomizedtesting.generators.RandomPicks; |
| + |
| public class TestSortingMergePolicy extends LuceneTestCase { |
| |
| - private static final String DELETE_TERM = "abc"; |
| - |
| + private List<String> terms; |
| private Directory dir1, dir2; |
| private Sorter sorter; |
| private IndexReader reader; |
| private IndexReader sortedReader; |
| |
| + @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| sorter = new NumericDocValuesSorter("ndv"); |
| @@ -56,32 +64,58 @@ |
| private Document randomDocument() { |
| final Document doc = new Document(); |
| doc.add(new NumericDocValuesField("ndv", random().nextLong())); |
| - doc.add(new StringField("s", rarely() ? DELETE_TERM : _TestUtil.randomSimpleString(random(), 3), Store.YES)); |
| + doc.add(new StringField("s", RandomPicks.randomFrom(random(), terms), Store.YES)); |
| return doc; |
| } |
| |
| + private static MergePolicy newSortingMergePolicy(Sorter sorter) { |
| + // create a MP with a low merge factor so that many merges happen |
| + MergePolicy mp; |
| + if (random().nextBoolean()) { |
| + TieredMergePolicy tmp = newTieredMergePolicy(random()); |
| + final int numSegs = _TestUtil.nextInt(random(), 3, 5); |
| + tmp.setSegmentsPerTier(numSegs); |
| + tmp.setMaxMergeAtOnce(_TestUtil.nextInt(random(), 2, numSegs)); |
| + mp = tmp; |
| + } else { |
| + LogMergePolicy lmp = newLogMergePolicy(random()); |
| + lmp.setMergeFactor(_TestUtil.nextInt(random(), 3, 5)); |
| + mp = lmp; |
| + } |
| + // wrap it with a sorting mp |
| + return new SortingMergePolicy(mp, sorter); |
| + } |
| + |
| private void createRandomIndexes() throws IOException { |
| dir1 = newDirectory(); |
| dir2 = newDirectory(); |
| - final int numDocs = atLeast(100); |
| + final int numDocs = atLeast(150); |
| + final int numTerms = _TestUtil.nextInt(random(), 1, numDocs / 5); |
| + Set<String> randomTerms = new HashSet<String>(); |
| + while (randomTerms.size() < numTerms) { |
| + randomTerms.add(_TestUtil.randomSimpleString(random())); |
| + } |
| + terms = new ArrayList<String>(randomTerms); |
| final long seed = random().nextLong(); |
| final IndexWriterConfig iwc1 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed))); |
| final IndexWriterConfig iwc2 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed))); |
| - iwc2.setMergePolicy(new SortingMergePolicy(iwc2.getMergePolicy(), sorter)); |
| - iwc2.setMergeScheduler(new SerialMergeScheduler()); // Remove this line when LUCENE-4752 is fixed |
| + iwc2.setMergePolicy(newSortingMergePolicy(sorter)); |
| final RandomIndexWriter iw1 = new RandomIndexWriter(new Random(seed), dir1, iwc1); |
| final RandomIndexWriter iw2 = new RandomIndexWriter(new Random(seed), dir2, iwc2); |
| for (int i = 0; i < numDocs; ++i) { |
| final Document doc = randomDocument(); |
| iw1.addDocument(doc); |
| iw2.addDocument(doc); |
| - if (i == numDocs / 2 || (i != numDocs - 1 && rarely())) { |
| + if (i == numDocs / 2 || (i != numDocs - 1 && random().nextInt(8) == 0)) { |
| iw1.commit(); |
| iw2.commit(); |
| } |
| + if (random().nextInt(5) == 0) { |
| + final String term = RandomPicks.randomFrom(random(), terms); |
| + iw1.deleteDocuments(new Term("s", term)); |
| + iw2.deleteDocuments(new Term("s", term)); |
| + } |
| } |
| - iw1.deleteDocuments(new Term("s", DELETE_TERM)); |
| - iw2.deleteDocuments(new Term("s", DELETE_TERM)); |
| iw1.forceMerge(1); |
| iw2.forceMerge(1); |
| iw1.close(); |
| @@ -90,6 +124,7 @@ |
| sortedReader = DirectoryReader.open(dir2); |
| } |
| |
| + @Override |
| public void tearDown() throws Exception { |
| reader.close(); |
| sortedReader.close(); |
| @@ -98,7 +133,7 @@ |
| super.tearDown(); |
| } |
| |
| - private void assertSorted(AtomicReader reader) throws IOException { |
| + private static void assertSorted(AtomicReader reader) throws IOException { |
| final NumericDocValues ndv = reader.getNumericDocValues("ndv"); |
| for (int i = 1; i < reader.maxDoc(); ++i) { |
| assertTrue(ndv.get(i-1) < ndv.get(i)); |
| Index: lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java |
| =================================================================== |
| --- lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java (révision 1461346) |
| +++ lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java (copie de travail) |
| @@ -626,7 +626,11 @@ |
| * defined by <code>sorter</code>. If the reader is already sorted, this |
| * method might return the reader as-is. */ |
| public static AtomicReader wrap(AtomicReader reader, Sorter sorter) throws IOException { |
| - final Sorter.DocMap docMap = sorter.sort(reader); |
| + return wrap(reader, sorter.sort(reader)); |
| + } |
| + |
| + /** Expert: same as {@link #wrap(AtomicReader, Sorter)} but operates directly on a {@link Sorter.DocMap}. */ |
| + public static AtomicReader wrap(AtomicReader reader, Sorter.DocMap docMap) { |
| if (docMap == null) { |
| // the reader is already sorter |
| return reader; |
| Index: lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java |
| =================================================================== |
| --- lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java (révision 1461346) |
| +++ lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java (copie de travail) |
| @@ -26,11 +26,14 @@ |
| import org.apache.lucene.index.IndexReader; |
| import org.apache.lucene.index.IndexWriter; |
| import org.apache.lucene.index.MergePolicy; |
| +import org.apache.lucene.index.MergeState; |
| import org.apache.lucene.index.MultiReader; |
| import org.apache.lucene.index.SegmentInfoPerCommit; |
| import org.apache.lucene.index.SegmentInfos; |
| import org.apache.lucene.index.SlowCompositeReaderWrapper; |
| import org.apache.lucene.store.Directory; |
| +import org.apache.lucene.util.Bits; |
| +import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer; |
| |
| /** A {@link MergePolicy} that reorders documents according to a {@link Sorter} |
| * before merging them. As a consequence, all segments resulting from a merge |
| @@ -42,36 +45,74 @@ |
| * @lucene.experimental */ |
| public final class SortingMergePolicy extends MergePolicy { |
| |
| - private class SortingOneMerge extends OneMerge { |
| + class SortingOneMerge extends OneMerge { |
| |
| + List<AtomicReader> unsortedReaders; |
| + Sorter.DocMap docMap; |
| + AtomicReader sortedView; |
| + |
| SortingOneMerge(List<SegmentInfoPerCommit> segments) { |
| super(segments); |
| } |
| |
| @Override |
| public List<AtomicReader> getMergeReaders() throws IOException { |
| - final List<AtomicReader> readers = super.getMergeReaders(); |
| - switch (readers.size()) { |
| - case 0: |
| - return readers; |
| - case 1: |
| - return Collections.singletonList(SortingAtomicReader.wrap(readers.get(0), sorter)); |
| - default: |
| - final IndexReader multiReader = new MultiReader(readers.toArray(new AtomicReader[readers.size()])); |
| - final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(multiReader); |
| - final AtomicReader sortingReader = SortingAtomicReader.wrap(atomicReader, sorter); |
| - if (sortingReader == atomicReader) { |
| - // already sorted, return the original list of readers so that |
| - // codec-specific bulk-merge methods can be used |
| - return readers; |
| + if (unsortedReaders == null) { |
| + unsortedReaders = super.getMergeReaders(); |
| + final AtomicReader atomicView; |
| + if (unsortedReaders.size() == 1) { |
| + atomicView = unsortedReaders.get(0); |
| + } else { |
| + final IndexReader multiReader = new MultiReader(unsortedReaders.toArray(new AtomicReader[unsortedReaders.size()])); |
| + atomicView = SlowCompositeReaderWrapper.wrap(multiReader); |
| + } |
| + docMap = sorter.sort(atomicView); |
| + sortedView = SortingAtomicReader.wrap(atomicView, docMap); |
| + } |
| + // a null doc map means that the readers are already sorted |
| + return docMap == null ? unsortedReaders : Collections.singletonList(sortedView); |
| + } |
| + |
| + private MonotonicAppendingLongBuffer getDeletes(List<AtomicReader> readers) { |
| + MonotonicAppendingLongBuffer deletes = new MonotonicAppendingLongBuffer(); |
| + int deleteCount = 0; |
| + for (AtomicReader reader : readers) { |
| + final int maxDoc = reader.maxDoc(); |
| + final Bits liveDocs = reader.getLiveDocs(); |
| + for (int i = 0; i < maxDoc; ++i) { |
| + if (liveDocs != null && !liveDocs.get(i)) { |
| + ++deleteCount; |
| + } else { |
| + deletes.add(deleteCount); |
| } |
| - return Collections.singletonList(sortingReader); |
| + } |
| } |
| + return deletes; |
| } |
| |
| + @Override |
| + public MergePolicy.DocMap getDocMap(final MergeState mergeState) { |
| + if (unsortedReaders == null) { |
| + throw new IllegalStateException(); |
| + } |
| + if (docMap == null) { |
| + return super.getDocMap(mergeState); |
| + } |
| + assert mergeState.docMaps.length == 1; // we returned a singleton reader |
| + final MonotonicAppendingLongBuffer deletes = getDeletes(unsortedReaders); |
| + return new MergePolicy.DocMap() { |
| + @Override |
| + public int map(int old) { |
| + final int oldWithDeletes = old + (int) deletes.get(old); |
| + final int newWithDeletes = docMap.oldToNew(oldWithDeletes); |
| + return mergeState.docMaps[0].get(newWithDeletes); |
| + } |
| + }; |
| + } |
| + |
| } |
| |
| - private class SortingMergeSpecification extends MergeSpecification { |
| + class SortingMergeSpecification extends MergeSpecification { |
| |
| @Override |
| public void add(OneMerge merge) { |
| @@ -96,8 +137,8 @@ |
| return sortingSpec; |
| } |
| |
| - private final MergePolicy in; |
| - private final Sorter sorter; |
| + final MergePolicy in; |
| + final Sorter sorter; |
| |
| /** Create a new {@link MergePolicy} that sorts documents with <code>sorter</code>. */ |
| public SortingMergePolicy(MergePolicy in, Sorter sorter) { |