| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.index; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.document.LongPoint; |
| import org.apache.lucene.document.NumericDocValuesField; |
| import org.apache.lucene.search.FieldDoc; |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.MatchAllDocsQuery; |
| import org.apache.lucene.search.ScoreDoc; |
| import org.apache.lucene.search.Sort; |
| import org.apache.lucene.search.SortField; |
| import org.apache.lucene.search.TopDocs; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FSDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.MockDirectoryWrapper; |
| import org.apache.lucene.store.MockDirectoryWrapper.Throttling; |
| import org.apache.lucene.util.Bits; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.StringHelper; |
| import org.apache.lucene.util.TestUtil; |
| import org.apache.lucene.util.Version; |
| |
| // TODO: |
| // - old parallel indices are only pruned on commit/close; can we do it on refresh? |
| |
| /** |
| * Simple example showing how to use ParallelLeafReader to index new stuff (postings, DVs, etc.) |
| * from previously stored fields, on the fly (during NRT reader reopen), after the initial indexing. |
| * The test indexes just a single stored field with text "content X" (X is a number embedded in the |
| * text). |
| * |
| * <p>Then, on reopen, for any newly created segments (flush or merge), it builds a new parallel |
| * segment by loading all stored docs, parsing out that X, and adding it as DV and numeric indexed |
| * (trie) field. |
| * |
| * <p>Finally, for searching, it builds a top-level MultiReader, with ParallelLeafReader for each |
| * segment, and then tests that random numeric range queries, and sorting by the new DV field, work |
| * correctly. |
| * |
| * <p>Each per-segment index lives in a private directory next to the main index, and they are |
| * deleted once their segments are removed from the index. They are "volatile", meaning if e.g. the |
| * index is replicated to another machine, it's OK to not copy parallel segments indices, since they |
| * will just be regnerated (at a cost though). |
| */ |
| |
| // @SuppressSysoutChecks(bugUrl="we print stuff") |
| // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows |
| // machines occasionally |
| public class TestDemoParallelLeafReader extends LuceneTestCase { |
| |
| static final boolean DEBUG = false; |
| |
| abstract static class ReindexingReader implements Closeable { |
| |
| /** Key used to store the current schema gen in the SegmentInfo diagnostics */ |
| public static final String SCHEMA_GEN_KEY = "schema_gen"; |
| |
| public final IndexWriter w; |
| public final ReaderManager mgr; |
| |
| private final Directory indexDir; |
| private final Path root; |
| private final Path segsPath; |
| |
| /** Which segments have been closed, but their parallel index is not yet not removed. */ |
| private final Set<SegmentIDAndGen> closedSegments = |
| Collections.newSetFromMap(new ConcurrentHashMap<SegmentIDAndGen, Boolean>()); |
| |
| /** Holds currently open parallel readers for each segment. */ |
| private final Map<SegmentIDAndGen, LeafReader> parallelReaders = new ConcurrentHashMap<>(); |
| |
| void printRefCounts() { |
| System.out.println("All refCounts:"); |
| for (Map.Entry<SegmentIDAndGen, LeafReader> ent : parallelReaders.entrySet()) { |
| System.out.println( |
| " " |
| + ent.getKey() |
| + " " |
| + ent.getValue() |
| + " refCount=" |
| + ent.getValue().getRefCount()); |
| } |
| } |
| |
| public ReindexingReader(Path root) throws IOException { |
| this.root = root; |
| |
| // Normal index is stored under "index": |
| indexDir = openDirectory(root.resolve("index")); |
| |
| // Per-segment parallel indices are stored under subdirs "segs": |
| segsPath = root.resolve("segs"); |
| Files.createDirectories(segsPath); |
| |
| IndexWriterConfig iwc = getIndexWriterConfig(); |
| iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy())); |
| if (DEBUG) { |
| System.out.println("TEST: use IWC:\n" + iwc); |
| } |
| w = new IndexWriter(indexDir, iwc); |
| |
| w.getConfig() |
| .setMergedSegmentWarmer( |
| (reader) -> { |
| // This will build the parallel index for the merged segment before the merge |
| // becomes visible, so reopen delay is only due to |
| // newly flushed segments: |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST: now warm " + reader); |
| // TODO: it's not great that we pass false here; it means we close the reader & |
| // reopen again for NRT reader; still we did "warm" by |
| // building the parallel index, if necessary |
| getParallelLeafReader(reader, false, getCurrentSchemaGen()); |
| }); |
| |
| // start with empty commit: |
| w.commit(); |
| mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w))); |
| } |
| |
| protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException; |
| |
| /** |
| * Optional method to validate that the provided parallell reader in fact reflects the changes |
| * in schemaGen. |
| */ |
| protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) |
| throws IOException {} |
| |
| /** Override to customize Directory impl. */ |
| protected Directory openDirectory(Path path) throws IOException { |
| return FSDirectory.open(path); |
| } |
| |
| public void commit() throws IOException { |
| w.commit(); |
| } |
| |
| LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException { |
| LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen); |
| if (parallelReader != null) { |
| |
| // We should not be embedding one ParallelLeafReader inside another: |
| assertFalse(parallelReader instanceof ParallelLeafReader); |
| assertFalse(reader instanceof ParallelLeafReader); |
| |
| // NOTE: important that parallelReader is first, so if there are field name overlaps, |
| // because changes to the schema |
| // overwrote existing field names, it wins: |
| LeafReader newReader = |
| new ParallelLeafReader(false, parallelReader, reader) { |
| @Override |
| public Bits getLiveDocs() { |
| return getParallelReaders()[1].getLiveDocs(); |
| } |
| |
| @Override |
| public int numDocs() { |
| return getParallelReaders()[1].numDocs(); |
| } |
| }; |
| |
| // Because ParallelLeafReader does its own (extra) incRef: |
| parallelReader.decRef(); |
| |
| return newReader; |
| |
| } else { |
| // This segment was already current as of currentSchemaGen: |
| return reader; |
| } |
| } |
| |
| private class ParallelLeafDirectoryReader extends FilterDirectoryReader { |
| public ParallelLeafDirectoryReader(DirectoryReader in) throws IOException { |
| super( |
| in, |
| new FilterDirectoryReader.SubReaderWrapper() { |
| final long currentSchemaGen = getCurrentSchemaGen(); |
| |
| @Override |
| public LeafReader wrap(LeafReader reader) { |
| try { |
| return getCurrentReader(reader, currentSchemaGen); |
| } catch (IOException ioe) { |
| // TODO: must close on exc here: |
| throw new RuntimeException(ioe); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { |
| return new ParallelLeafDirectoryReader(in); |
| } |
| |
| @Override |
| protected void doClose() throws IOException { |
| Throwable firstExc = null; |
| for (final LeafReader r : getSequentialSubReaders()) { |
| if (r instanceof ParallelLeafReader) { |
| // try to close each reader, even if an exception is thrown |
| try { |
| r.decRef(); |
| } catch (Throwable t) { |
| if (firstExc == null) { |
| firstExc = t; |
| } |
| } |
| } |
| } |
| // Also close in, so it decRef's the SegmentInfos |
| try { |
| in.doClose(); |
| } catch (Throwable t) { |
| if (firstExc == null) { |
| firstExc = t; |
| } |
| } |
| |
| // throw the first exception |
| if (firstExc != null) { |
| throw IOUtils.rethrowAlways(firstExc); |
| } |
| } |
| |
| @Override |
| public CacheHelper getReaderCacheHelper() { |
| return null; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| w.close(); |
| if (DEBUG) |
| System.out.println( |
| "TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir)); |
| |
| /* |
| DirectoryReader r = mgr.acquire(); |
| try { |
| TestUtil.checkReader(r); |
| } finally { |
| mgr.release(r); |
| } |
| */ |
| mgr.close(); |
| pruneOldSegments(true); |
| assertNoExtraSegments(); |
| indexDir.close(); |
| } |
| |
| // Make sure we deleted all parallel indices for segments that are no longer in the main index: |
| private void assertNoExtraSegments() throws IOException { |
| Set<String> liveIDs = new HashSet<String>(); |
| for (SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) { |
| String idString = StringHelper.idToString(info.info.getId()); |
| liveIDs.add(idString); |
| } |
| |
| // At this point (closing) the only segments in closedSegments should be the still-live ones: |
| for (SegmentIDAndGen segIDGen : closedSegments) { |
| assertTrue(liveIDs.contains(segIDGen.segID)); |
| } |
| |
| boolean fail = false; |
| for (Path path : segSubDirs(segsPath)) { |
| SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); |
| if (liveIDs.contains(segIDGen.segID) == false) { |
| if (DEBUG) |
| System.out.println( |
| "TEST: fail seg=" |
| + path.getFileName() |
| + " is not live but still has a parallel index"); |
| fail = true; |
| } |
| } |
| assertFalse(fail); |
| } |
| |
| private static class SegmentIDAndGen { |
| public final String segID; |
| public final long schemaGen; |
| |
| public SegmentIDAndGen(String segID, long schemaGen) { |
| this.segID = segID; |
| this.schemaGen = schemaGen; |
| } |
| |
| public SegmentIDAndGen(String s) { |
| String[] parts = s.split("_"); |
| if (parts.length != 2) { |
| throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\""); |
| } |
| // TODO: better checking of segID? |
| segID = parts[0]; |
| schemaGen = Long.parseLong(parts[1]); |
| } |
| |
| @Override |
| public int hashCode() { |
| return (int) (segID.hashCode() * schemaGen); |
| } |
| |
| @Override |
| public boolean equals(Object _other) { |
| if (_other instanceof SegmentIDAndGen) { |
| SegmentIDAndGen other = (SegmentIDAndGen) _other; |
| return segID.equals(other.segID) && schemaGen == other.schemaGen; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return segID + "_" + schemaGen; |
| } |
| } |
| |
| private class ParallelReaderClosed implements IndexReader.ClosedListener { |
| private final SegmentIDAndGen segIDGen; |
| private final Directory dir; |
| |
| public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) { |
| this.segIDGen = segIDGen; |
| this.dir = dir; |
| } |
| |
| @Override |
| public void onClose(IndexReader.CacheKey ignored) { |
| try { |
| // TODO: make this sync finer, i.e. just the segment + schemaGen |
| synchronized (ReindexingReader.this) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: now close parallel parLeafReader dir=" |
| + dir |
| + " segIDGen=" |
| + segIDGen); |
| parallelReaders.remove(segIDGen); |
| dir.close(); |
| closedSegments.add(segIDGen); |
| } |
| } catch (IOException ioe) { |
| System.out.println("TEST: hit IOExc closing dir=" + dir); |
| ioe.printStackTrace(System.out); |
| throw new RuntimeException(ioe); |
| } |
| } |
| } |
| |
| // Returns a ref |
| LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) |
| throws IOException { |
| assert leaf instanceof SegmentReader; |
| SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info; |
| |
| long infoSchemaGen = getSchemaGen(info); |
| |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: getParallelLeafReader: " |
| + leaf |
| + " infoSchemaGen=" |
| + infoSchemaGen |
| + " vs schemaGen=" |
| + schemaGen |
| + " doCache=" |
| + doCache); |
| |
| if (infoSchemaGen == schemaGen) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: segment is already current schemaGen=" |
| + schemaGen |
| + "; skipping"); |
| return null; |
| } |
| |
| if (infoSchemaGen > schemaGen) { |
| throw new IllegalStateException( |
| "segment infoSchemaGen (" |
| + infoSchemaGen |
| + ") cannot be greater than requested schemaGen (" |
| + schemaGen |
| + ")"); |
| } |
| |
| final SegmentIDAndGen segIDGen = |
| new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen); |
| |
| // While loop because the parallel reader may be closed out from under us, so we must retry: |
| while (true) { |
| |
| // TODO: make this sync finer, i.e. just the segment + schemaGen |
| synchronized (this) { |
| LeafReader parReader = parallelReaders.get(segIDGen); |
| |
| assert doCache || parReader == null; |
| |
| if (parReader == null) { |
| |
| Path leafIndex = segsPath.resolve(segIDGen.toString()); |
| |
| final Directory dir = openDirectory(leafIndex); |
| |
| if (slowFileExists(dir, "done") == false) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: build segment index for " |
| + leaf |
| + " " |
| + segIDGen |
| + " (source: " |
| + info.getDiagnostics().get("source") |
| + ") dir=" |
| + leafIndex); |
| |
| if (dir.listAll().length != 0) { |
| // It crashed before finishing last time: |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: remove old incomplete index files: " |
| + leafIndex); |
| IOUtils.rm(leafIndex); |
| } |
| |
| reindex(infoSchemaGen, schemaGen, leaf, dir); |
| |
| // Marker file, telling us this index is in fact done. This way if we crash while |
| // doing the reindexing for a given segment, we will |
| // later try again: |
| dir.createOutput("done", IOContext.DEFAULT).close(); |
| } else { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: segment index already exists for " |
| + leaf |
| + " " |
| + segIDGen |
| + " (source: " |
| + info.getDiagnostics().get("source") |
| + ") dir=" |
| + leafIndex); |
| } |
| |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST: now check index " + dir); |
| // TestUtil.checkIndex(dir); |
| |
| SegmentInfos infos = SegmentInfos.readLatestCommit(dir); |
| assert infos.size() == 1; |
| final LeafReader parLeafReader = |
| new SegmentReader(infos.info(0), Version.LATEST.major, IOContext.DEFAULT); |
| |
| // checkParallelReader(leaf, parLeafReader, schemaGen); |
| |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: opened parallel reader: " |
| + parLeafReader); |
| if (doCache) { |
| parallelReaders.put(segIDGen, parLeafReader); |
| |
| // Our id+gen could have been previously closed, e.g. if it was a merged segment that |
| // was warmed, so we must clear this else |
| // the pruning may remove our directory: |
| closedSegments.remove(segIDGen); |
| |
| parLeafReader |
| .getReaderCacheHelper() |
| .addClosedListener(new ParallelReaderClosed(segIDGen, dir)); |
| |
| } else { |
| // Used only for merged segment warming: |
| // Messy: we close this reader now, instead of leaving open for reuse: |
| if (DEBUG) |
| System.out.println( |
| "TEST: now decRef non cached refCount=" + parLeafReader.getRefCount()); |
| parLeafReader.decRef(); |
| dir.close(); |
| |
| // Must do this after dir is closed, else another thread could "rm -rf" while we are |
| // closing (which makes MDW.close's |
| // checkIndex angry): |
| closedSegments.add(segIDGen); |
| parReader = null; |
| } |
| parReader = parLeafReader; |
| |
| } else { |
| if (parReader.tryIncRef() == false) { |
| // We failed: this reader just got closed by another thread, e.g. refresh thread |
| // opening a new reader, so this reader is now |
| // closed and we must try again. |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: tryIncRef failed for " |
| + parReader |
| + "; retry"); |
| parReader = null; |
| continue; |
| } |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: use existing already opened parReader=" |
| + parReader |
| + " refCount=" |
| + parReader.getRefCount()); |
| // checkParallelReader(leaf, parReader, schemaGen); |
| } |
| |
| // We return the new reference to caller |
| return parReader; |
| } |
| } |
| } |
| |
| // TODO: we could pass a writer already opened...? |
| protected abstract void reindex( |
| long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) |
| throws IOException; |
| |
| /** Returns the gen for the current schema. */ |
| protected abstract long getCurrentSchemaGen(); |
| |
| /** |
| * Returns the gen that should be merged, meaning those changes will be folded back into the |
| * main index. |
| */ |
| protected long getMergingSchemaGen() { |
| return getCurrentSchemaGen(); |
| } |
| |
| /** |
| * Removes the parallel index that are no longer in the last commit point. We can't remove this |
| * when the parallel reader is closed because it may still be referenced by the last commit. |
| */ |
| private void pruneOldSegments(boolean removeOldGens) throws IOException { |
| SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir); |
| if (DEBUG) System.out.println("TEST: prune"); |
| |
| Set<String> liveIDs = new HashSet<String>(); |
| for (SegmentCommitInfo info : lastCommit) { |
| String idString = StringHelper.idToString(info.info.getId()); |
| liveIDs.add(idString); |
| } |
| |
| long currentSchemaGen = getCurrentSchemaGen(); |
| |
| if (Files.exists(segsPath)) { |
| for (Path path : segSubDirs(segsPath)) { |
| if (Files.isDirectory(path)) { |
| SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); |
| assert segIDGen.schemaGen <= currentSchemaGen; |
| if (liveIDs.contains(segIDGen.segID) == false |
| && (closedSegments.contains(segIDGen) |
| || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) { |
| if (DEBUG) System.out.println("TEST: remove " + segIDGen); |
| try { |
| IOUtils.rm(path); |
| closedSegments.remove(segIDGen); |
| } catch (IOException ioe) { |
| // OK, we'll retry later |
| if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Just replaces the sub-readers with parallel readers, so reindexed fields are merged into new |
| * segments. |
| */ |
| private class ReindexingMergePolicy extends FilterMergePolicy { |
| |
| class ReindexingOneMerge extends OneMerge { |
| |
| final List<ParallelLeafReader> parallelReaders = new ArrayList<>(); |
| final long schemaGen; |
| |
| ReindexingOneMerge(List<SegmentCommitInfo> segments) { |
| super(segments); |
| // Commit up front to which schemaGen we will merge; we don't want a schema change |
| // sneaking in for some of our leaf readers but not others: |
| schemaGen = getMergingSchemaGen(); |
| long currentSchemaGen = getCurrentSchemaGen(); |
| |
| // Defensive sanity check: |
| if (schemaGen > currentSchemaGen) { |
| throw new IllegalStateException( |
| "currentSchemaGen (" |
| + currentSchemaGen |
| + ") must always be >= mergingSchemaGen (" |
| + schemaGen |
| + ")"); |
| } |
| } |
| |
| @Override |
| public CodecReader wrapForMerge(CodecReader reader) throws IOException { |
| LeafReader wrapped = getCurrentReader(reader, schemaGen); |
| if (wrapped instanceof ParallelLeafReader) { |
| parallelReaders.add((ParallelLeafReader) wrapped); |
| } |
| return SlowCodecReaderWrapper.wrap(wrapped); |
| } |
| |
| @Override |
| public void mergeFinished(boolean success, boolean segmentDropped) throws IOException { |
| super.mergeFinished(success, segmentDropped); |
| Throwable th = null; |
| for (ParallelLeafReader r : parallelReaders) { |
| try { |
| r.decRef(); |
| } catch (Throwable t) { |
| if (th == null) { |
| th = t; |
| } |
| } |
| } |
| |
| if (th != null) { |
| throw IOUtils.rethrowAlways(th); |
| } |
| } |
| |
| @Override |
| public void setMergeInfo(SegmentCommitInfo info) { |
| // Record that this merged segment is current as of this schemaGen: |
| Map<String, String> copy = new HashMap<>(info.info.getDiagnostics()); |
| copy.put(SCHEMA_GEN_KEY, Long.toString(schemaGen)); |
| info.info.setDiagnostics(copy); |
| super.setMergeInfo(info); |
| } |
| } |
| |
| class ReindexingMergeSpecification extends MergeSpecification { |
| @Override |
| public void add(OneMerge merge) { |
| super.add(new ReindexingOneMerge(merge.segments)); |
| } |
| |
| @Override |
| public String segString(Directory dir) { |
| return "ReindexingMergeSpec(" + super.segString(dir) + ")"; |
| } |
| } |
| |
| MergeSpecification wrap(MergeSpecification spec) { |
| MergeSpecification wrapped = null; |
| if (spec != null) { |
| wrapped = new ReindexingMergeSpecification(); |
| for (OneMerge merge : spec.merges) { |
| wrapped.add(merge); |
| } |
| } |
| return wrapped; |
| } |
| |
| /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */ |
| public ReindexingMergePolicy(MergePolicy in) { |
| super(in); |
| } |
| |
| @Override |
| public MergeSpecification findMerges( |
| MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) |
| throws IOException { |
| return wrap(in.findMerges(mergeTrigger, segmentInfos, mergeContext)); |
| } |
| |
| @Override |
| public MergeSpecification findForcedMerges( |
| SegmentInfos segmentInfos, |
| int maxSegmentCount, |
| Map<SegmentCommitInfo, Boolean> segmentsToMerge, |
| MergeContext mergeContext) |
| throws IOException { |
| // TODO: do we need to force-force this? Ie, wrapped MP may think index is already |
| // optimized, yet maybe its schemaGen is old? need test! |
| return wrap( |
| in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext)); |
| } |
| |
| @Override |
| public MergeSpecification findForcedDeletesMerges( |
| SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { |
| return wrap(in.findForcedDeletesMerges(segmentInfos, mergeContext)); |
| } |
| |
| @Override |
| public boolean useCompoundFile( |
| SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) |
| throws IOException { |
| return in.useCompoundFile(segments, newSegment, mergeContext); |
| } |
| |
| @Override |
| public String toString() { |
| return "ReindexingMergePolicy(" + in + ")"; |
| } |
| } |
| |
| static long getSchemaGen(SegmentInfo info) { |
| String s = info.getDiagnostics().get(SCHEMA_GEN_KEY); |
| if (s == null) { |
| return -1; |
| } else { |
| return Long.parseLong(s); |
| } |
| } |
| } |
| |
| private ReindexingReader getReindexer(Path root) throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex( |
| long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) |
| throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| // Slowly parse the stored field into a new doc values field: |
| for (int i = 0; i < maxDoc; i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number", value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return 0; |
| } |
| }; |
| } |
| |
| /** Schema change by adding a new number_<schemaGen> DV field each time. */ |
| private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) |
| throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex( |
| long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) |
| throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| if (oldSchemaGen <= 0) { |
| // Must slowly parse the stored field into a new doc values field: |
| for (int i = 0; i < maxDoc; i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| } else { |
| // Just carry over doc values from previous field: |
| NumericDocValues oldValues = reader.getNumericDocValues("number_" + oldSchemaGen); |
| assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues); |
| for (int i = 0; i < maxDoc; i++) { |
| // TODO: is this still O(blockSize^2)? |
| assertEquals(i, oldValues.nextDoc()); |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, oldValues.longValue())); |
| w.addDocument(newDoc); |
| } |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return currentSchemaGen.get(); |
| } |
| |
| @Override |
| protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) |
| throws IOException { |
| String fieldName = "number_" + schemaGen; |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: now check parallel number DVs field=" |
| + fieldName |
| + " r=" |
| + r |
| + " parR=" |
| + parR); |
| NumericDocValues numbers = parR.getNumericDocValues(fieldName); |
| if (numbers == null) { |
| return; |
| } |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| for (int i = 0; i < maxDoc; i++) { |
| Document oldDoc = r.document(i); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| if (DEBUG) |
| System.out.println( |
| "FAIL: docID=" |
| + i |
| + " " |
| + oldDoc |
| + " value=" |
| + value |
| + " number=" |
| + numbers.longValue() |
| + " numbers=" |
| + numbers); |
| failed = true; |
| } else if (failed) { |
| if (DEBUG) |
| System.out.println( |
| "OK: docID=" |
| + i |
| + " " |
| + oldDoc |
| + " value=" |
| + value |
| + " number=" |
| + numbers.longValue()); |
| } |
| } |
| assertFalse("FAILED field=" + fieldName + " r=" + r, failed); |
| } |
| }; |
| } |
| |
| /** Schema change by adding changing how the same "number" DV field is indexed. */ |
| private ReindexingReader getReindexerSameDVField( |
| Path root, final AtomicLong currentSchemaGen, final AtomicLong mergingSchemaGen) |
| throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| if (TEST_NIGHTLY) { |
| // during nightly tests, we might use too many files if we arent careful |
| iwc.setUseCompoundFile(true); |
| } |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex( |
| long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) |
| throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| if (oldSchemaGen <= 0) { |
| // Must slowly parse the stored field into a new doc values field: |
| for (int i = 0; i < maxDoc; i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number", newSchemaGen * value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| } else { |
| // Just carry over doc values from previous field: |
| NumericDocValues oldValues = reader.getNumericDocValues("number"); |
| assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues); |
| for (int i = 0; i < maxDoc; i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| assertEquals(i, oldValues.nextDoc()); |
| newDoc.add( |
| new NumericDocValuesField( |
| "number", newSchemaGen * (oldValues.longValue() / oldSchemaGen))); |
| w.addDocument(newDoc); |
| } |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return currentSchemaGen.get(); |
| } |
| |
| @Override |
| protected long getMergingSchemaGen() { |
| return mergingSchemaGen.get(); |
| } |
| |
| @Override |
| protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) |
| throws IOException { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST: now check parallel number DVs r=" |
| + r |
| + " parR=" |
| + parR); |
| NumericDocValues numbers = parR.getNumericDocValues("numbers"); |
| if (numbers == null) { |
| return; |
| } |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| for (int i = 0; i < maxDoc; i++) { |
| Document oldDoc = r.document(i); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| value *= schemaGen; |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| System.out.println( |
| "FAIL: docID=" |
| + i |
| + " " |
| + oldDoc |
| + " value=" |
| + value |
| + " number=" |
| + numbers.longValue() |
| + " numbers=" |
| + numbers); |
| failed = true; |
| } else if (failed) { |
| System.out.println( |
| "OK: docID=" |
| + i |
| + " " |
| + oldDoc |
| + " value=" |
| + value |
| + " number=" |
| + numbers.longValue()); |
| } |
| } |
| assertFalse("FAILED r=" + r, failed); |
| } |
| }; |
| } |
| |
| public void testBasicMultipleSchemaGens() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| Path root = createTempDir(); |
| ReindexingReader reindexer = getReindexerNewDVFields(root, currentSchemaGen); |
| reindexer.commit(); |
| |
| Document doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: refresh @ 1 doc"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| // reindexer.printRefCounts(); |
| |
| currentSchemaGen.incrementAndGet(); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: increment schemaGen"); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: commit"); |
| reindexer.commit(); |
| |
| doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 2 docs"); |
| reindexer.mgr.maybeRefresh(); |
| // reindexer.printRefCounts(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: forceMerge"); |
| reindexer.w.forceMerge(1); |
| |
| currentSchemaGen.incrementAndGet(); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| if (DEBUG) System.out.println("TEST: refresh after forceMerge"); |
| reindexer.mgr.maybeRefresh(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: close writer"); |
| reindexer.close(); |
| } |
| |
| public void testRandomMultipleSchemaGens() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| Path root = createTempDir(); |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| for (int i = 0; i < numDocs; i++) { |
| if (reindexer == null) { |
| reindexer = getReindexerNewDVFields(root, currentSchemaGen); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i + 1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) |
| System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| if (DEBUG) reindexer.printRefCounts(); |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (random().nextInt(500) == 17) { |
| currentSchemaGen.incrementAndGet(); |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST TOP: advance schemaGen to " |
| + currentSchemaGen); |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", "" + random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i + 1) + " docs"); |
| reindexer.commit(); |
| // reindexer.printRefCounts(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i + 1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| } |
| |
| /** |
| * First schema change creates a new "number" DV field off the stored field; subsequent changes |
| * just change the value of that number field for all docs. |
| */ |
| public void testRandomMultipleSchemaGensSameField() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| AtomicLong mergingSchemaGen = new AtomicLong(); |
| |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| Path root = createTempDir(); |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| |
| for (int i = 0; i < numDocs; i++) { |
| if (reindexer == null) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: open new reader/writer"); |
| reindexer = getReindexerSameDVField(root, currentSchemaGen, mergingSchemaGen); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add( |
| newTextField( |
| "text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i + 1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) |
| System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get()); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| if (DEBUG) reindexer.printRefCounts(); |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (random().nextInt(500) == 17) { |
| currentSchemaGen.incrementAndGet(); |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST TOP: advance schemaGen to " |
| + currentSchemaGen); |
| if (random().nextBoolean()) { |
| mergingSchemaGen.incrementAndGet(); |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() |
| + ": TEST TOP: advance mergingSchemaGen to " |
| + mergingSchemaGen); |
| } |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", "" + random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i + 1) + " docs"); |
| reindexer.commit(); |
| // reindexer.printRefCounts(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) |
| System.out.println( |
| Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i + 1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| |
| // Verify main index never reflects schema changes beyond mergingSchemaGen: |
| try (Directory dir = newFSDirectory(root.resolve("index")); |
| IndexReader r = DirectoryReader.open(dir)) { |
| for (LeafReaderContext ctx : r.leaves()) { |
| LeafReader leaf = ctx.reader(); |
| NumericDocValues numbers = leaf.getNumericDocValues("number"); |
| if (numbers != null) { |
| int maxDoc = leaf.maxDoc(); |
| for (int i = 0; i < maxDoc; i++) { |
| Document doc = leaf.document(i); |
| long value = Long.parseLong(doc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| long dvValue = numbers.longValue(); |
| if (value == 0) { |
| assertEquals(0, dvValue); |
| } else { |
| assertTrue(dvValue % value == 0); |
| assertTrue(dvValue / value <= mergingSchemaGen.get()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| public void testBasic() throws Exception { |
| Path tempPath = createTempDir(); |
| ReindexingReader reindexer = getReindexer(tempPath); |
| |
| // Start with initial empty commit: |
| reindexer.commit(); |
| |
| Document doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 1 doc"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| // reindexer.printRefCounts(); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 2 docs"); |
| reindexer.mgr.maybeRefresh(); |
| // reindexer.printRefCounts(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: forceMerge"); |
| reindexer.w.forceMerge(1); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| if (DEBUG) System.out.println("TEST: refresh after forceMerge"); |
| reindexer.mgr.maybeRefresh(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: close writer"); |
| reindexer.close(); |
| } |
| |
| public void testRandom() throws Exception { |
| Path root = createTempDir(); |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| for (int i = 0; i < numDocs; i++) { |
| if (reindexer == null) { |
| reindexer = getReindexer(root); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: refresh @ " + (i + 1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", "" + random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: commit @ " + (i + 1) + " docs"); |
| reindexer.commit(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: close writer @ " + (i + 1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| } |
| |
| private static void checkAllNumberDVs(IndexReader r) throws IOException { |
| checkAllNumberDVs(r, "number", true, 1); |
| } |
| |
| private static void checkAllNumberDVs( |
| IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException { |
| NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName); |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| long t0 = System.currentTimeMillis(); |
| for (int i = 0; i < maxDoc; i++) { |
| Document oldDoc = r.document(i); |
| long value = multiplier * Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| System.out.println( |
| "FAIL: docID=" |
| + i |
| + " " |
| + oldDoc |
| + " value=" |
| + value |
| + " number=" |
| + numbers.longValue() |
| + " numbers=" |
| + numbers); |
| failed = true; |
| } else if (failed) { |
| System.out.println( |
| "OK: docID=" + i + " " + oldDoc + " value=" + value + " number=" + numbers.longValue()); |
| } |
| } |
| if (failed) { |
| if (r instanceof LeafReader == false) { |
| System.out.println("TEST FAILED; check leaves"); |
| for (LeafReaderContext ctx : r.leaves()) { |
| System.out.println("CHECK LEAF=" + ctx.reader()); |
| checkAllNumberDVs(ctx.reader(), fieldName, false, 1); |
| } |
| } |
| if (doThrow) { |
| assertFalse("FAILED field=" + fieldName + " r=" + r, failed); |
| } else { |
| System.out.println("FAILED field=" + fieldName + " r=" + r); |
| } |
| } |
| } |
| |
| private static void testNumericDVSort(IndexSearcher s) throws IOException { |
| // Confirm we can sort by the new DV field: |
| TopDocs hits = |
| s.search( |
| new MatchAllDocsQuery(), 100, new Sort(new SortField("number", SortField.Type.LONG))); |
| long last = Long.MIN_VALUE; |
| for (ScoreDoc scoreDoc : hits.scoreDocs) { |
| long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]); |
| assertTrue(value >= last); |
| assertEquals(value, ((Long) ((FieldDoc) scoreDoc).fields[0]).longValue()); |
| last = value; |
| } |
| } |
| |
| private static void testPointRangeQuery(IndexSearcher s) throws IOException { |
| for (int i = 0; i < 100; i++) { |
| // Confirm we can range search by the new indexed (numeric) field: |
| long min = random().nextLong(); |
| long max = random().nextLong(); |
| if (min > max) { |
| long x = min; |
| min = max; |
| max = x; |
| } |
| |
| TopDocs hits = s.search(LongPoint.newRangeQuery("number", min, max), 100); |
| for (ScoreDoc scoreDoc : hits.scoreDocs) { |
| long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]); |
| assertTrue(value >= min); |
| assertTrue(value <= max); |
| } |
| |
| Arrays.sort( |
| hits.scoreDocs, |
| new Comparator<ScoreDoc>() { |
| @Override |
| public int compare(ScoreDoc a, ScoreDoc b) { |
| return a.doc - b.doc; |
| } |
| }); |
| |
| NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number"); |
| for (ScoreDoc hit : hits.scoreDocs) { |
| if (numbers.docID() < hit.doc) { |
| numbers.advance(hit.doc); |
| } |
| assertEquals(hit.doc, numbers.docID()); |
| long value = Long.parseLong(s.doc(hit.doc).get("text").split(" ")[1]); |
| assertEquals(value, numbers.longValue()); |
| } |
| } |
| } |
| |
| // TODO: maybe the leading id could be further restricted? It's from StringHelper.idToString: |
| static final Pattern SEG_GEN_SUB_DIR_PATTERN = Pattern.compile("^[a-z0-9]+_([0-9]+)$"); |
| |
| private static List<Path> segSubDirs(Path segsPath) throws IOException { |
| List<Path> result = new ArrayList<>(); |
| try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) { |
| for (Path path : stream) { |
| // Must be form <segIDString>_<longGen> |
| if (Files.isDirectory(path) |
| && SEG_GEN_SUB_DIR_PATTERN.matcher(path.getFileName().toString()).matches()) { |
| result.add(path); |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| // TODO: test exceptions |
| } |