blob: d6d1483bff2662eb9691bc4d8dd11988ab9f5524 [file] [log] [blame]
Index: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (révision 1461346)
+++ lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (copie de travail)
@@ -25,6 +25,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.apache.lucene.util.SetOnce;
@@ -58,7 +59,29 @@
*/
public abstract class MergePolicy implements java.io.Closeable, Cloneable {
-
+
+ /** A map of doc IDs. */
+ public static abstract class DocMap {
+ /** Return the new doc ID according to its old value. */
+ public abstract int map(int old);
+
+ /** Useful from an assert. */
+ boolean isConsistent(int maxDoc) {
+ final FixedBitSet targets = new FixedBitSet(maxDoc);
+ for (int i = 0; i < maxDoc; ++i) {
+ final int target = map(i);
+ if (target < 0 || target >= maxDoc) {
+ assert false : "out of range: " + target + " not in [0-" + maxDoc + "[";
+ return false;
+ } else if (targets.get(target)) {
+ assert false : target + " is already taken (" + i + ")";
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
/** OneMerge provides the information necessary to perform
* an individual primitive merge operation, resulting in
* a single new segment. The merge spec includes the
@@ -105,9 +128,12 @@
totalDocCount = count;
}
- /** Get the list of readers to merge. Note that this list does not
+ /** Expert: Get the list of readers to merge. Note that this list does not
* necessarily match the list of segments to merge and should only be used
- * to feed SegmentMerger to initialize a merge. */
+ * to feed SegmentMerger to initialize a merge. When a {@link OneMerge}
+ * reorders doc IDs, it must override {@link #getDocMap} too so that
+ * deletes that happened during the merge can be applied to the newly
+ * merged segment. */
public List<AtomicReader> getMergeReaders() throws IOException {
if (readers == null) {
throw new IllegalStateException("IndexWriter has not initialized readers from the segment infos yet");
@@ -121,6 +147,20 @@
return Collections.unmodifiableList(readers);
}
+ /** Expert: If {@link #getMergeReaders()} reorders document IDs, this method
+ * must be overridden to return a mapping from the <i>natural</i> doc ID
+ * (the doc ID that would result from a natural merge) to the actual doc
+ * ID. This mapping is used to apply deletions that happened during the
+ * merge to the new segment. */
+ public DocMap getDocMap(MergeState mergeState) {
+ return new DocMap() {
+ @Override
+ public int map(int docID) {
+ return docID;
+ }
+ };
+ }
+
/** Record that an exception occurred while executing
* this merge */
synchronized void setException(Throwable error) {
Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (révision 1461346)
+++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (copie de travail)
@@ -2975,7 +2975,7 @@
* saves the resulting deletes file (incrementing the
* delete generation for merge.info). If no deletes were
* flushed, no new deletes file is saved. */
- synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException {
+ synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
assert testPoint("startCommitMergeDeletes");
@@ -2992,6 +2992,7 @@
// Lazy init (only when we find a delete to carry over):
ReadersAndLiveDocs mergedDeletes = null;
+ MergePolicy.DocMap docMap = null;
for(int i=0; i < sourceSegments.size(); i++) {
SegmentInfoPerCommit info = sourceSegments.get(i);
@@ -3037,8 +3038,10 @@
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
+ docMap = merge.getDocMap(mergeState);
+ assert docMap.isConsistent(merge.info.info.getDocCount());
}
- mergedDeletes.delete(docUpto);
+ mergedDeletes.delete(docMap.map(docUpto));
}
docUpto++;
}
@@ -3055,8 +3058,10 @@
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
+ docMap = merge.getDocMap(mergeState);
+ assert docMap.isConsistent(merge.info.info.getDocCount());
}
- mergedDeletes.delete(docUpto);
+ mergedDeletes.delete(docMap.map(docUpto));
}
docUpto++;
}
@@ -3081,7 +3086,7 @@
return mergedDeletes;
}
- synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
+ synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
assert testPoint("startCommitMerge");
@@ -3109,7 +3114,7 @@
return false;
}
- final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge);
+ final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState);
assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0;
@@ -3780,7 +3785,7 @@
// Force READ context because we merge deletes onto
// this reader:
- if (!commitMerge(merge)) {
+ if (!commitMerge(merge, mergeState)) {
// commitMerge will return false if this merge was aborted
return 0;
}
Index: lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java
===================================================================
--- lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (révision 1461346)
+++ lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (copie de travail)
@@ -18,7 +18,11 @@
*/
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@@ -29,24 +33,28 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LogMergePolicy;
+import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.RandomIndexWriter;
-import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+
public class TestSortingMergePolicy extends LuceneTestCase {
- private static final String DELETE_TERM = "abc";
-
+ private List<String> terms;
private Directory dir1, dir2;
private Sorter sorter;
private IndexReader reader;
private IndexReader sortedReader;
+ @Override
public void setUp() throws Exception {
super.setUp();
sorter = new NumericDocValuesSorter("ndv");
@@ -56,32 +64,58 @@
private Document randomDocument() {
final Document doc = new Document();
doc.add(new NumericDocValuesField("ndv", random().nextLong()));
- doc.add(new StringField("s", rarely() ? DELETE_TERM : _TestUtil.randomSimpleString(random(), 3), Store.YES));
+ doc.add(new StringField("s", RandomPicks.randomFrom(random(), terms), Store.YES));
return doc;
}
+ private static MergePolicy newSortingMergePolicy(Sorter sorter) {
+ // create a MP with a low merge factor so that many merges happen
+ MergePolicy mp;
+ if (random().nextBoolean()) {
+ TieredMergePolicy tmp = newTieredMergePolicy(random());
+ final int numSegs = _TestUtil.nextInt(random(), 3, 5);
+ tmp.setSegmentsPerTier(numSegs);
+ tmp.setMaxMergeAtOnce(_TestUtil.nextInt(random(), 2, numSegs));
+ mp = tmp;
+ } else {
+ LogMergePolicy lmp = newLogMergePolicy(random());
+ lmp.setMergeFactor(_TestUtil.nextInt(random(), 3, 5));
+ mp = lmp;
+ }
+ // wrap it with a sorting mp
+ return new SortingMergePolicy(mp, sorter);
+ }
+
private void createRandomIndexes() throws IOException {
dir1 = newDirectory();
dir2 = newDirectory();
- final int numDocs = atLeast(100);
+ final int numDocs = atLeast(150);
+ final int numTerms = _TestUtil.nextInt(random(), 1, numDocs / 5);
+ Set<String> randomTerms = new HashSet<String>();
+ while (randomTerms.size() < numTerms) {
+ randomTerms.add(_TestUtil.randomSimpleString(random()));
+ }
+ terms = new ArrayList<String>(randomTerms);
final long seed = random().nextLong();
final IndexWriterConfig iwc1 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed)));
final IndexWriterConfig iwc2 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed)));
- iwc2.setMergePolicy(new SortingMergePolicy(iwc2.getMergePolicy(), sorter));
- iwc2.setMergeScheduler(new SerialMergeScheduler()); // Remove this line when LUCENE-4752 is fixed
+ iwc2.setMergePolicy(newSortingMergePolicy(sorter));
final RandomIndexWriter iw1 = new RandomIndexWriter(new Random(seed), dir1, iwc1);
final RandomIndexWriter iw2 = new RandomIndexWriter(new Random(seed), dir2, iwc2);
for (int i = 0; i < numDocs; ++i) {
final Document doc = randomDocument();
iw1.addDocument(doc);
iw2.addDocument(doc);
- if (i == numDocs / 2 || (i != numDocs - 1 && rarely())) {
+ if (i == numDocs / 2 || (i != numDocs - 1 && random().nextInt(8) == 0)) {
iw1.commit();
iw2.commit();
}
+ if (random().nextInt(5) == 0) {
+ final String term = RandomPicks.randomFrom(random(), terms);
+ iw1.deleteDocuments(new Term("s", term));
+ iw2.deleteDocuments(new Term("s", term));
+ }
}
- iw1.deleteDocuments(new Term("s", DELETE_TERM));
- iw2.deleteDocuments(new Term("s", DELETE_TERM));
iw1.forceMerge(1);
iw2.forceMerge(1);
iw1.close();
@@ -90,6 +124,7 @@
sortedReader = DirectoryReader.open(dir2);
}
+ @Override
public void tearDown() throws Exception {
reader.close();
sortedReader.close();
@@ -98,7 +133,7 @@
super.tearDown();
}
- private void assertSorted(AtomicReader reader) throws IOException {
+ private static void assertSorted(AtomicReader reader) throws IOException {
final NumericDocValues ndv = reader.getNumericDocValues("ndv");
for (int i = 1; i < reader.maxDoc(); ++i) {
assertTrue(ndv.get(i-1) < ndv.get(i));
Index: lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java
===================================================================
--- lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java (révision 1461346)
+++ lucene/misc/src/java/org/apache/lucene/index/sorter/SortingAtomicReader.java (copie de travail)
@@ -626,7 +626,11 @@
* defined by <code>sorter</code>. If the reader is already sorted, this
* method might return the reader as-is. */
public static AtomicReader wrap(AtomicReader reader, Sorter sorter) throws IOException {
- final Sorter.DocMap docMap = sorter.sort(reader);
+ return wrap(reader, sorter.sort(reader));
+ }
+
+ /** Expert: same as {@link #wrap(AtomicReader, Sorter)} but operates directly on a {@link Sorter.DocMap}. */
+ public static AtomicReader wrap(AtomicReader reader, Sorter.DocMap docMap) {
if (docMap == null) {
// the reader is already sorter
return reader;
Index: lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java
===================================================================
--- lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java (révision 1461346)
+++ lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java (copie de travail)
@@ -26,11 +26,14 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.SegmentInfoPerCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
/** A {@link MergePolicy} that reorders documents according to a {@link Sorter}
* before merging them. As a consequence, all segments resulting from a merge
@@ -42,36 +45,74 @@
* @lucene.experimental */
public final class SortingMergePolicy extends MergePolicy {
- private class SortingOneMerge extends OneMerge {
+ class SortingOneMerge extends OneMerge {
+ List<AtomicReader> unsortedReaders;
+ Sorter.DocMap docMap;
+ AtomicReader sortedView;
+
SortingOneMerge(List<SegmentInfoPerCommit> segments) {
super(segments);
}
@Override
public List<AtomicReader> getMergeReaders() throws IOException {
- final List<AtomicReader> readers = super.getMergeReaders();
- switch (readers.size()) {
- case 0:
- return readers;
- case 1:
- return Collections.singletonList(SortingAtomicReader.wrap(readers.get(0), sorter));
- default:
- final IndexReader multiReader = new MultiReader(readers.toArray(new AtomicReader[readers.size()]));
- final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(multiReader);
- final AtomicReader sortingReader = SortingAtomicReader.wrap(atomicReader, sorter);
- if (sortingReader == atomicReader) {
- // already sorted, return the original list of readers so that
- // codec-specific bulk-merge methods can be used
- return readers;
+ if (unsortedReaders == null) {
+ unsortedReaders = super.getMergeReaders();
+ final AtomicReader atomicView;
+ if (unsortedReaders.size() == 1) {
+ atomicView = unsortedReaders.get(0);
+ } else {
+ final IndexReader multiReader = new MultiReader(unsortedReaders.toArray(new AtomicReader[unsortedReaders.size()]));
+ atomicView = SlowCompositeReaderWrapper.wrap(multiReader);
+ }
+ docMap = sorter.sort(atomicView);
+ sortedView = SortingAtomicReader.wrap(atomicView, docMap);
+ }
+ // a null doc map means that the readers are already sorted
+ return docMap == null ? unsortedReaders : Collections.singletonList(sortedView);
+ }
+
+ private MonotonicAppendingLongBuffer getDeletes(List<AtomicReader> readers) {
+ MonotonicAppendingLongBuffer deletes = new MonotonicAppendingLongBuffer();
+ int deleteCount = 0;
+ for (AtomicReader reader : readers) {
+ final int maxDoc = reader.maxDoc();
+ final Bits liveDocs = reader.getLiveDocs();
+ for (int i = 0; i < maxDoc; ++i) {
+ if (liveDocs != null && !liveDocs.get(i)) {
+ ++deleteCount;
+ } else {
+ deletes.add(deleteCount);
}
- return Collections.singletonList(sortingReader);
+ }
}
+ return deletes;
}
+ @Override
+ public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
+ if (unsortedReaders == null) {
+ throw new IllegalStateException();
+ }
+ if (docMap == null) {
+ return super.getDocMap(mergeState);
+ }
+ assert mergeState.docMaps.length == 1; // we returned a singleton reader
+ final MonotonicAppendingLongBuffer deletes = getDeletes(unsortedReaders);
+ return new MergePolicy.DocMap() {
+ @Override
+ public int map(int old) {
+ final int oldWithDeletes = old + (int) deletes.get(old);
+ final int newWithDeletes = docMap.oldToNew(oldWithDeletes);
+ return mergeState.docMaps[0].get(newWithDeletes);
+ }
+ };
+ }
+
}
- private class SortingMergeSpecification extends MergeSpecification {
+ class SortingMergeSpecification extends MergeSpecification {
@Override
public void add(OneMerge merge) {
@@ -96,8 +137,8 @@
return sortingSpec;
}
- private final MergePolicy in;
- private final Sorter sorter;
+ final MergePolicy in;
+ final Sorter sorter;
/** Create a new {@link MergePolicy} that sorts documents with <code>sorter</code>. */
public SortingMergePolicy(MergePolicy in, Sorter sorter) {