| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.index; |
| |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.document.LongPoint; |
| import org.apache.lucene.document.NumericDocValuesField; |
| import org.apache.lucene.search.FieldDoc; |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.MatchAllDocsQuery; |
| import org.apache.lucene.search.ScoreDoc; |
| import org.apache.lucene.search.Sort; |
| import org.apache.lucene.search.SortField; |
| import org.apache.lucene.search.TopDocs; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FSDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.MockDirectoryWrapper.Throttling; |
| import org.apache.lucene.store.MockDirectoryWrapper; |
| import org.apache.lucene.util.Bits; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.StringHelper; |
| import org.apache.lucene.util.TestUtil; |
| import org.apache.lucene.util.Version; |
| |
| // TODO: |
| // - old parallel indices are only pruned on commit/close; can we do it on refresh? |
| |
| /** Simple example showing how to use ParallelLeafReader to index new |
| * stuff (postings, DVs, etc.) from previously stored fields, on the |
| * fly (during NRT reader reopen), after the initial indexing. The |
| * test indexes just a single stored field with text "content X" (X is |
| * a number embedded in the text). |
| * |
| * Then, on reopen, for any newly created segments (flush or merge), it |
| * builds a new parallel segment by loading all stored docs, parsing |
| * out that X, and adding it as DV and numeric indexed (trie) field. |
| * |
| * Finally, for searching, it builds a top-level MultiReader, with |
| * ParallelLeafReader for each segment, and then tests that random |
| * numeric range queries, and sorting by the new DV field, work |
| * correctly. |
| * |
| * Each per-segment index lives in a private directory next to the main |
| * index, and they are deleted once their segments are removed from the |
| * index. They are "volatile", meaning if e.g. the index is replicated to |
| * another machine, it's OK to not copy parallel segments indices, |
| * since they will just be regnerated (at a cost though). */ |
| |
| // @SuppressSysoutChecks(bugUrl="we print stuff") |
| // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally |
| public class TestDemoParallelLeafReader extends LuceneTestCase { |
| |
| static final boolean DEBUG = false; |
| |
| static abstract class ReindexingReader implements Closeable { |
| |
| /** Key used to store the current schema gen in the SegmentInfo diagnostics */ |
| public final static String SCHEMA_GEN_KEY = "schema_gen"; |
| |
| public final IndexWriter w; |
| public final ReaderManager mgr; |
| |
| private final Directory indexDir; |
| private final Path root; |
| private final Path segsPath; |
| |
| /** Which segments have been closed, but their parallel index is not yet not removed. */ |
| private final Set<SegmentIDAndGen> closedSegments = Collections.newSetFromMap(new ConcurrentHashMap<SegmentIDAndGen,Boolean>()); |
| |
| /** Holds currently open parallel readers for each segment. */ |
| private final Map<SegmentIDAndGen,LeafReader> parallelReaders = new ConcurrentHashMap<>(); |
| |
| void printRefCounts() { |
| System.out.println("All refCounts:"); |
| for(Map.Entry<SegmentIDAndGen,LeafReader> ent : parallelReaders.entrySet()) { |
| System.out.println(" " + ent.getKey() + " " + ent.getValue() + " refCount=" + ent.getValue().getRefCount()); |
| } |
| } |
| |
| public ReindexingReader(Path root) throws IOException { |
| this.root = root; |
| |
| // Normal index is stored under "index": |
| indexDir = openDirectory(root.resolve("index")); |
| |
| // Per-segment parallel indices are stored under subdirs "segs": |
| segsPath = root.resolve("segs"); |
| Files.createDirectories(segsPath); |
| |
| IndexWriterConfig iwc = getIndexWriterConfig(); |
| iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy())); |
| if (DEBUG) { |
| System.out.println("TEST: use IWC:\n" + iwc); |
| } |
| w = new IndexWriter(indexDir, iwc); |
| |
| w.getConfig().setMergedSegmentWarmer((reader) -> { |
| // This will build the parallel index for the merged segment before the merge becomes visible, so reopen delay is only due to |
| // newly flushed segments: |
| if (DEBUG) System.out.println(Thread.currentThread().getName() +": TEST: now warm " + reader); |
| // TODO: it's not great that we pass false here; it means we close the reader & reopen again for NRT reader; still we did "warm" by |
| // building the parallel index, if necessary |
| getParallelLeafReader(reader, false, getCurrentSchemaGen()); |
| }); |
| |
| // start with empty commit: |
| w.commit(); |
| mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w))); |
| } |
| |
| protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException; |
| |
| /** Optional method to validate that the provided parallell reader in fact reflects the changes in schemaGen. */ |
| protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) throws IOException { |
| } |
| |
| /** Override to customize Directory impl. */ |
| protected Directory openDirectory(Path path) throws IOException { |
| return FSDirectory.open(path); |
| } |
| |
| public void commit() throws IOException { |
| w.commit(); |
| } |
| |
| LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException { |
| LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen); |
| if (parallelReader != null) { |
| |
| // We should not be embedding one ParallelLeafReader inside another: |
| assertFalse(parallelReader instanceof ParallelLeafReader); |
| assertFalse(reader instanceof ParallelLeafReader); |
| |
| // NOTE: important that parallelReader is first, so if there are field name overlaps, because changes to the schema |
| // overwrote existing field names, it wins: |
| LeafReader newReader = new ParallelLeafReader(false, parallelReader, reader) { |
| @Override |
| public Bits getLiveDocs() { |
| return getParallelReaders()[1].getLiveDocs(); |
| } |
| @Override |
| public int numDocs() { |
| return getParallelReaders()[1].numDocs(); |
| } |
| }; |
| |
| // Because ParallelLeafReader does its own (extra) incRef: |
| parallelReader.decRef(); |
| |
| return newReader; |
| |
| } else { |
| // This segment was already current as of currentSchemaGen: |
| return reader; |
| } |
| } |
| |
| private class ParallelLeafDirectoryReader extends FilterDirectoryReader { |
| public ParallelLeafDirectoryReader(DirectoryReader in) throws IOException { |
| super(in, new FilterDirectoryReader.SubReaderWrapper() { |
| final long currentSchemaGen = getCurrentSchemaGen(); |
| @Override |
| public LeafReader wrap(LeafReader reader) { |
| try { |
| return getCurrentReader(reader, currentSchemaGen); |
| } catch (IOException ioe) { |
| // TODO: must close on exc here: |
| throw new RuntimeException(ioe); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { |
| return new ParallelLeafDirectoryReader(in); |
| } |
| |
| @Override |
| protected void doClose() throws IOException { |
| Throwable firstExc = null; |
| for (final LeafReader r : getSequentialSubReaders()) { |
| if (r instanceof ParallelLeafReader) { |
| // try to close each reader, even if an exception is thrown |
| try { |
| r.decRef(); |
| } catch (Throwable t) { |
| if (firstExc == null) { |
| firstExc = t; |
| } |
| } |
| } |
| } |
| // Also close in, so it decRef's the SegmentInfos |
| try { |
| in.doClose(); |
| } catch (Throwable t) { |
| if (firstExc == null) { |
| firstExc = t; |
| } |
| } |
| |
| // throw the first exception |
| if (firstExc != null) { |
| throw IOUtils.rethrowAlways(firstExc); |
| } |
| } |
| |
| @Override |
| public CacheHelper getReaderCacheHelper() { |
| return null; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| w.close(); |
| if (DEBUG) System.out.println("TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir)); |
| |
| /* |
| DirectoryReader r = mgr.acquire(); |
| try { |
| TestUtil.checkReader(r); |
| } finally { |
| mgr.release(r); |
| } |
| */ |
| mgr.close(); |
| pruneOldSegments(true); |
| assertNoExtraSegments(); |
| indexDir.close(); |
| } |
| |
| // Make sure we deleted all parallel indices for segments that are no longer in the main index: |
| private void assertNoExtraSegments() throws IOException { |
| Set<String> liveIDs = new HashSet<String>(); |
| for(SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) { |
| String idString = StringHelper.idToString(info.info.getId()); |
| liveIDs.add(idString); |
| } |
| |
| // At this point (closing) the only segments in closedSegments should be the still-live ones: |
| for(SegmentIDAndGen segIDGen : closedSegments) { |
| assertTrue(liveIDs.contains(segIDGen.segID)); |
| } |
| |
| boolean fail = false; |
| for(Path path : segSubDirs(segsPath)) { |
| SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); |
| if (liveIDs.contains(segIDGen.segID) == false) { |
| if (DEBUG) System.out.println("TEST: fail seg=" + path.getFileName() + " is not live but still has a parallel index"); |
| fail = true; |
| } |
| } |
| assertFalse(fail); |
| } |
| |
| private static class SegmentIDAndGen { |
| public final String segID; |
| public final long schemaGen; |
| |
| public SegmentIDAndGen(String segID, long schemaGen) { |
| this.segID = segID; |
| this.schemaGen = schemaGen; |
| } |
| |
| public SegmentIDAndGen(String s) { |
| String[] parts = s.split("_"); |
| if (parts.length != 2) { |
| throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\""); |
| } |
| // TODO: better checking of segID? |
| segID = parts[0]; |
| schemaGen = Long.parseLong(parts[1]); |
| } |
| |
| @Override |
| public int hashCode() { |
| return (int) (segID.hashCode() * schemaGen); |
| } |
| |
| @Override |
| public boolean equals(Object _other) { |
| if (_other instanceof SegmentIDAndGen) { |
| SegmentIDAndGen other = (SegmentIDAndGen) _other; |
| return segID.equals(other.segID) && schemaGen == other.schemaGen; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return segID + "_" + schemaGen; |
| } |
| } |
| |
| private class ParallelReaderClosed implements IndexReader.ClosedListener { |
| private final SegmentIDAndGen segIDGen; |
| private final Directory dir; |
| |
| public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) { |
| this.segIDGen = segIDGen; |
| this.dir = dir; |
| } |
| |
| @Override |
| public void onClose(IndexReader.CacheKey ignored) { |
| try { |
| // TODO: make this sync finer, i.e. just the segment + schemaGen |
| synchronized(ReindexingReader.this) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now close parallel parLeafReader dir=" + dir + " segIDGen=" + segIDGen); |
| parallelReaders.remove(segIDGen); |
| dir.close(); |
| closedSegments.add(segIDGen); |
| } |
| } catch (IOException ioe) { |
| System.out.println("TEST: hit IOExc closing dir=" + dir); |
| ioe.printStackTrace(System.out); |
| throw new RuntimeException(ioe); |
| } |
| } |
| } |
| |
| // Returns a ref |
| LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) throws IOException { |
| assert leaf instanceof SegmentReader; |
| SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info; |
| |
| long infoSchemaGen = getSchemaGen(info); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: getParallelLeafReader: " + leaf + " infoSchemaGen=" + infoSchemaGen + " vs schemaGen=" + schemaGen + " doCache=" + doCache); |
| |
| if (infoSchemaGen == schemaGen) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: segment is already current schemaGen=" + schemaGen + "; skipping"); |
| return null; |
| } |
| |
| if (infoSchemaGen > schemaGen) { |
| throw new IllegalStateException("segment infoSchemaGen (" + infoSchemaGen + ") cannot be greater than requested schemaGen (" + schemaGen + ")"); |
| } |
| |
| final SegmentIDAndGen segIDGen = new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen); |
| |
| // While loop because the parallel reader may be closed out from under us, so we must retry: |
| while (true) { |
| |
| // TODO: make this sync finer, i.e. just the segment + schemaGen |
| synchronized (this) { |
| LeafReader parReader = parallelReaders.get(segIDGen); |
| |
| assert doCache || parReader == null; |
| |
| if (parReader == null) { |
| |
| Path leafIndex = segsPath.resolve(segIDGen.toString()); |
| |
| final Directory dir = openDirectory(leafIndex); |
| |
| if (slowFileExists(dir, "done") == false) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: build segment index for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex); |
| |
| if (dir.listAll().length != 0) { |
| // It crashed before finishing last time: |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: remove old incomplete index files: " + leafIndex); |
| IOUtils.rm(leafIndex); |
| } |
| |
| reindex(infoSchemaGen, schemaGen, leaf, dir); |
| |
| // Marker file, telling us this index is in fact done. This way if we crash while doing the reindexing for a given segment, we will |
| // later try again: |
| dir.createOutput("done", IOContext.DEFAULT).close(); |
| } else { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: segment index already exists for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex); |
| } |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check index " + dir); |
| //TestUtil.checkIndex(dir); |
| |
| SegmentInfos infos = SegmentInfos.readLatestCommit(dir); |
| assert infos.size() == 1; |
| final LeafReader parLeafReader = new SegmentReader(infos.info(0), Version.LATEST.major, IOContext.DEFAULT); |
| |
| //checkParallelReader(leaf, parLeafReader, schemaGen); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: opened parallel reader: " + parLeafReader); |
| if (doCache) { |
| parallelReaders.put(segIDGen, parLeafReader); |
| |
| // Our id+gen could have been previously closed, e.g. if it was a merged segment that was warmed, so we must clear this else |
| // the pruning may remove our directory: |
| closedSegments.remove(segIDGen); |
| |
| parLeafReader.getReaderCacheHelper().addClosedListener(new ParallelReaderClosed(segIDGen, dir)); |
| |
| } else { |
| // Used only for merged segment warming: |
| // Messy: we close this reader now, instead of leaving open for reuse: |
| if (DEBUG) System.out.println("TEST: now decRef non cached refCount=" + parLeafReader.getRefCount()); |
| parLeafReader.decRef(); |
| dir.close(); |
| |
| // Must do this after dir is closed, else another thread could "rm -rf" while we are closing (which makes MDW.close's |
| // checkIndex angry): |
| closedSegments.add(segIDGen); |
| parReader = null; |
| } |
| parReader = parLeafReader; |
| |
| } else { |
| if (parReader.tryIncRef() == false) { |
| // We failed: this reader just got closed by another thread, e.g. refresh thread opening a new reader, so this reader is now |
| // closed and we must try again. |
| if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: tryIncRef failed for " + parReader + "; retry"); |
| parReader = null; |
| continue; |
| } |
| if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: use existing already opened parReader=" + parReader + " refCount=" + parReader.getRefCount()); |
| //checkParallelReader(leaf, parReader, schemaGen); |
| } |
| |
| // We return the new reference to caller |
| return parReader; |
| } |
| } |
| } |
| |
| // TODO: we could pass a writer already opened...? |
| protected abstract void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException; |
| |
| /** Returns the gen for the current schema. */ |
| protected abstract long getCurrentSchemaGen(); |
| |
| /** Returns the gen that should be merged, meaning those changes will be folded back into the main index. */ |
| protected long getMergingSchemaGen() { |
| return getCurrentSchemaGen(); |
| } |
| |
| /** Removes the parallel index that are no longer in the last commit point. We can't |
| * remove this when the parallel reader is closed because it may still be referenced by |
| * the last commit. */ |
| private void pruneOldSegments(boolean removeOldGens) throws IOException { |
| SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir); |
| if (DEBUG) System.out.println("TEST: prune"); |
| |
| Set<String> liveIDs = new HashSet<String>(); |
| for(SegmentCommitInfo info : lastCommit) { |
| String idString = StringHelper.idToString(info.info.getId()); |
| liveIDs.add(idString); |
| } |
| |
| long currentSchemaGen = getCurrentSchemaGen(); |
| |
| if (Files.exists(segsPath)) { |
| for (Path path : segSubDirs(segsPath)) { |
| if (Files.isDirectory(path)) { |
| SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString()); |
| assert segIDGen.schemaGen <= currentSchemaGen; |
| if (liveIDs.contains(segIDGen.segID) == false && (closedSegments.contains(segIDGen) || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) { |
| if (DEBUG) System.out.println("TEST: remove " + segIDGen); |
| try { |
| IOUtils.rm(path); |
| closedSegments.remove(segIDGen); |
| } catch (IOException ioe) { |
| // OK, we'll retry later |
| if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** Just replaces the sub-readers with parallel readers, so reindexed fields are merged into new segments. */ |
| private class ReindexingMergePolicy extends FilterMergePolicy { |
| |
| class ReindexingOneMerge extends OneMerge { |
| |
| final List<ParallelLeafReader> parallelReaders = new ArrayList<>(); |
| final long schemaGen; |
| |
| ReindexingOneMerge(List<SegmentCommitInfo> segments) { |
| super(segments); |
| // Commit up front to which schemaGen we will merge; we don't want a schema change sneaking in for some of our leaf readers but not others: |
| schemaGen = getMergingSchemaGen(); |
| long currentSchemaGen = getCurrentSchemaGen(); |
| |
| // Defensive sanity check: |
| if (schemaGen > currentSchemaGen) { |
| throw new IllegalStateException("currentSchemaGen (" + currentSchemaGen + ") must always be >= mergingSchemaGen (" + schemaGen + ")"); |
| } |
| } |
| |
| @Override |
| public CodecReader wrapForMerge(CodecReader reader) throws IOException { |
| LeafReader wrapped = getCurrentReader(reader, schemaGen); |
| if (wrapped instanceof ParallelLeafReader) { |
| parallelReaders.add((ParallelLeafReader) wrapped); |
| } |
| return SlowCodecReaderWrapper.wrap(wrapped); |
| } |
| |
| @Override |
| public void mergeFinished(boolean success, boolean segmentDropped) throws IOException { |
| super.mergeFinished(success, segmentDropped); |
| Throwable th = null; |
| for (ParallelLeafReader r : parallelReaders) { |
| try { |
| r.decRef(); |
| } catch (Throwable t) { |
| if (th == null) { |
| th = t; |
| } |
| } |
| } |
| |
| if (th != null) { |
| throw IOUtils.rethrowAlways(th); |
| } |
| } |
| |
| @Override |
| public void setMergeInfo(SegmentCommitInfo info) { |
| // Record that this merged segment is current as of this schemaGen: |
| Map<String, String> copy = new HashMap<>(info.info.getDiagnostics()); |
| copy.put(SCHEMA_GEN_KEY, Long.toString(schemaGen)); |
| info.info.setDiagnostics(copy); |
| super.setMergeInfo(info); |
| } |
| |
| } |
| |
| class ReindexingMergeSpecification extends MergeSpecification { |
| @Override |
| public void add(OneMerge merge) { |
| super.add(new ReindexingOneMerge(merge.segments)); |
| } |
| |
| @Override |
| public String segString(Directory dir) { |
| return "ReindexingMergeSpec(" + super.segString(dir) + ")"; |
| } |
| } |
| |
| MergeSpecification wrap(MergeSpecification spec) { |
| MergeSpecification wrapped = null; |
| if (spec != null) { |
| wrapped = new ReindexingMergeSpecification(); |
| for (OneMerge merge : spec.merges) { |
| wrapped.add(merge); |
| } |
| } |
| return wrapped; |
| } |
| |
| /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */ |
| public ReindexingMergePolicy(MergePolicy in) { |
| super(in); |
| } |
| |
| @Override |
| public MergeSpecification findMerges(MergeTrigger mergeTrigger, |
| SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { |
| return wrap(in.findMerges(mergeTrigger, segmentInfos, mergeContext)); |
| } |
| |
| @Override |
| public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, |
| int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) |
| throws IOException { |
| // TODO: do we need to force-force this? Ie, wrapped MP may think index is already optimized, yet maybe its schemaGen is old? need test! |
| return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext)); |
| } |
| |
| @Override |
| public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) |
| throws IOException { |
| return wrap(in.findForcedDeletesMerges(segmentInfos, mergeContext)); |
| } |
| |
| @Override |
| public boolean useCompoundFile(SegmentInfos segments, |
| SegmentCommitInfo newSegment, MergeContext mergeContext) throws IOException { |
| return in.useCompoundFile(segments, newSegment, mergeContext); |
| } |
| |
| @Override |
| public String toString() { |
| return "ReindexingMergePolicy(" + in + ")"; |
| } |
| } |
| |
| static long getSchemaGen(SegmentInfo info) { |
| String s = info.getDiagnostics().get(SCHEMA_GEN_KEY); |
| if (s == null) { |
| return -1; |
| } else { |
| return Long.parseLong(s); |
| } |
| } |
| } |
| |
| private ReindexingReader getReindexer(Path root) throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| // Slowly parse the stored field into a new doc values field: |
| for(int i=0;i<maxDoc;i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number", value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return 0; |
| } |
| }; |
| } |
| |
| /** Schema change by adding a new number_<schemaGen> DV field each time. */ |
| private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| if (oldSchemaGen <= 0) { |
| // Must slowly parse the stored field into a new doc values field: |
| for(int i=0;i<maxDoc;i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| } else { |
| // Just carry over doc values from previous field: |
| NumericDocValues oldValues = reader.getNumericDocValues("number_" + oldSchemaGen); |
| assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues); |
| for(int i=0;i<maxDoc;i++) { |
| // TODO: is this still O(blockSize^2)? |
| assertEquals(i, oldValues.nextDoc()); |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, oldValues.longValue())); |
| w.addDocument(newDoc); |
| } |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return currentSchemaGen.get(); |
| } |
| |
| @Override |
| protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException { |
| String fieldName = "number_" + schemaGen; |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs field=" + fieldName + " r=" + r + " parR=" + parR); |
| NumericDocValues numbers = parR.getNumericDocValues(fieldName); |
| if (numbers == null) { |
| return; |
| } |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| for(int i=0;i<maxDoc;i++) { |
| Document oldDoc = r.document(i); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| if (DEBUG) System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue() + " numbers=" + numbers); |
| failed = true; |
| } else if (failed) { |
| if (DEBUG) System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue()); |
| } |
| } |
| assertFalse("FAILED field=" + fieldName + " r=" + r, failed); |
| } |
| }; |
| } |
| |
| /** Schema change by adding changing how the same "number" DV field is indexed. */ |
| private ReindexingReader getReindexerSameDVField(Path root, final AtomicLong currentSchemaGen, final AtomicLong mergingSchemaGen) throws IOException { |
| return new ReindexingReader(root) { |
| @Override |
| protected IndexWriterConfig getIndexWriterConfig() throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| TieredMergePolicy tmp = new TieredMergePolicy(); |
| // We write tiny docs, so we need tiny floor to avoid O(N^2) merging: |
| tmp.setFloorSegmentMB(.01); |
| iwc.setMergePolicy(tmp); |
| if (TEST_NIGHTLY) { |
| // during nightly tests, we might use too many files if we arent careful |
| iwc.setUseCompoundFile(true); |
| } |
| return iwc; |
| } |
| |
| @Override |
| protected Directory openDirectory(Path path) throws IOException { |
| MockDirectoryWrapper dir = newMockFSDirectory(path); |
| dir.setUseSlowOpenClosers(false); |
| dir.setThrottling(Throttling.NEVER); |
| return dir; |
| } |
| |
| @Override |
| protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException { |
| IndexWriterConfig iwc = newIndexWriterConfig(); |
| |
| // The order of our docIDs must precisely matching incoming reader: |
| iwc.setMergePolicy(new LogByteSizeMergePolicy()); |
| IndexWriter w = new IndexWriter(parallelDir, iwc); |
| int maxDoc = reader.maxDoc(); |
| |
| if (oldSchemaGen <= 0) { |
| // Must slowly parse the stored field into a new doc values field: |
| for(int i=0;i<maxDoc;i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| newDoc.add(new NumericDocValuesField("number", newSchemaGen*value)); |
| newDoc.add(new LongPoint("number", value)); |
| w.addDocument(newDoc); |
| } |
| } else { |
| // Just carry over doc values from previous field: |
| NumericDocValues oldValues = reader.getNumericDocValues("number"); |
| assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues); |
| for(int i=0;i<maxDoc;i++) { |
| // TODO: is this still O(blockSize^2)? |
| Document oldDoc = reader.document(i); |
| Document newDoc = new Document(); |
| assertEquals(i, oldValues.nextDoc()); |
| newDoc.add(new NumericDocValuesField("number", newSchemaGen*(oldValues.longValue()/oldSchemaGen))); |
| w.addDocument(newDoc); |
| } |
| } |
| |
| w.forceMerge(1); |
| |
| w.close(); |
| } |
| |
| @Override |
| protected long getCurrentSchemaGen() { |
| return currentSchemaGen.get(); |
| } |
| |
| @Override |
| protected long getMergingSchemaGen() { |
| return mergingSchemaGen.get(); |
| } |
| |
| @Override |
| protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs r=" + r + " parR=" + parR); |
| NumericDocValues numbers = parR.getNumericDocValues("numbers"); |
| if (numbers == null) { |
| return; |
| } |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| for(int i=0;i<maxDoc;i++) { |
| Document oldDoc = r.document(i); |
| long value = Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| value *= schemaGen; |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue() + " numbers=" + numbers); |
| failed = true; |
| } else if (failed) { |
| System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue()); |
| } |
| } |
| assertFalse("FAILED r=" + r, failed); |
| } |
| }; |
| } |
| |
| public void testBasicMultipleSchemaGens() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| Path root = createTempDir(); |
| ReindexingReader reindexer = getReindexerNewDVFields(root, currentSchemaGen); |
| reindexer.commit(); |
| |
| Document doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: refresh @ 1 doc"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| //reindexer.printRefCounts(); |
| |
| currentSchemaGen.incrementAndGet(); |
| |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: increment schemaGen"); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: commit"); |
| reindexer.commit(); |
| |
| doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 2 docs"); |
| reindexer.mgr.maybeRefresh(); |
| //reindexer.printRefCounts(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: forceMerge"); |
| reindexer.w.forceMerge(1); |
| |
| currentSchemaGen.incrementAndGet(); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| if (DEBUG) System.out.println("TEST: refresh after forceMerge"); |
| reindexer.mgr.maybeRefresh(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: close writer"); |
| reindexer.close(); |
| } |
| |
| public void testRandomMultipleSchemaGens() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| Path root = createTempDir(); |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| for(int i=0;i<numDocs;i++) { |
| if (reindexer == null) { |
| reindexer = getReindexerNewDVFields(root, currentSchemaGen); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| if (DEBUG) reindexer.printRefCounts(); |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (random().nextInt(500) == 17) { |
| currentSchemaGen.incrementAndGet(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen); |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs"); |
| reindexer.commit(); |
| //reindexer.printRefCounts(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| } |
| |
| /** First schema change creates a new "number" DV field off the stored field; subsequent changes just change the value of that number |
| * field for all docs. */ |
| public void testRandomMultipleSchemaGensSameField() throws Exception { |
| |
| AtomicLong currentSchemaGen = new AtomicLong(); |
| AtomicLong mergingSchemaGen = new AtomicLong(); |
| |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| Path root = createTempDir(); |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| |
| for(int i=0;i<numDocs;i++) { |
| if (reindexer == null) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: open new reader/writer"); |
| reindexer = getReindexerSameDVField(root, currentSchemaGen, mergingSchemaGen); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add(newTextField("text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r); |
| try { |
| checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get()); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| if (DEBUG) reindexer.printRefCounts(); |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (random().nextInt(500) == 17) { |
| currentSchemaGen.incrementAndGet(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen); |
| if (random().nextBoolean()) { |
| mergingSchemaGen.incrementAndGet(); |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance mergingSchemaGen to " + mergingSchemaGen); |
| } |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs"); |
| reindexer.commit(); |
| //reindexer.printRefCounts(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| |
| // Verify main index never reflects schema changes beyond mergingSchemaGen: |
| try (Directory dir = newFSDirectory(root.resolve("index")); |
| IndexReader r = DirectoryReader.open(dir)) { |
| for (LeafReaderContext ctx : r.leaves()) { |
| LeafReader leaf = ctx.reader(); |
| NumericDocValues numbers = leaf.getNumericDocValues("number"); |
| if (numbers != null) { |
| int maxDoc = leaf.maxDoc(); |
| for(int i=0;i<maxDoc;i++) { |
| Document doc = leaf.document(i); |
| long value = Long.parseLong(doc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| long dvValue = numbers.longValue(); |
| if (value == 0) { |
| assertEquals(0, dvValue); |
| } else { |
| assertTrue(dvValue % value == 0); |
| assertTrue(dvValue / value <= mergingSchemaGen.get()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| public void testBasic() throws Exception { |
| Path tempPath = createTempDir(); |
| ReindexingReader reindexer = getReindexer(tempPath); |
| |
| // Start with initial empty commit: |
| reindexer.commit(); |
| |
| Document doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 1 doc"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| //reindexer.printRefCounts(); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| doc = new Document(); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| reindexer.w.addDocument(doc); |
| |
| if (DEBUG) System.out.println("TEST: refresh @ 2 docs"); |
| reindexer.mgr.maybeRefresh(); |
| //reindexer.printRefCounts(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: forceMerge"); |
| reindexer.w.forceMerge(1); |
| |
| if (DEBUG) System.out.println("TEST: commit"); |
| reindexer.commit(); |
| |
| if (DEBUG) System.out.println("TEST: refresh after forceMerge"); |
| reindexer.mgr.maybeRefresh(); |
| r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| |
| if (DEBUG) System.out.println("TEST: close writer"); |
| reindexer.close(); |
| } |
| |
| public void testRandom() throws Exception { |
| Path root = createTempDir(); |
| ReindexingReader reindexer = null; |
| |
| // TODO: separate refresh thread, search threads, indexing threads |
| int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 200); |
| int maxID = 0; |
| int refreshEveryNumDocs = 100; |
| int commitCloseNumDocs = 1000; |
| for(int i=0;i<numDocs;i++) { |
| if (reindexer == null) { |
| reindexer = getReindexer(root); |
| } |
| |
| Document doc = new Document(); |
| String id; |
| String updateID; |
| if (maxID > 0 && random().nextInt(10) == 7) { |
| // Replace a doc |
| id = "" + random().nextInt(maxID); |
| updateID = id; |
| } else { |
| id = "" + (maxID++); |
| updateID = null; |
| } |
| |
| doc.add(newStringField("id", id, Field.Store.NO)); |
| doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES)); |
| if (updateID == null) { |
| reindexer.w.addDocument(doc); |
| } else { |
| reindexer.w.updateDocument(new Term("id", updateID), doc); |
| } |
| |
| if (random().nextInt(refreshEveryNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: refresh @ " + (i+1) + " docs"); |
| reindexer.mgr.maybeRefresh(); |
| DirectoryReader r = reindexer.mgr.acquire(); |
| if (DEBUG) System.out.println("TEST: got reader=" + r); |
| try { |
| checkAllNumberDVs(r); |
| IndexSearcher s = newSearcher(r); |
| testNumericDVSort(s); |
| testPointRangeQuery(s); |
| } finally { |
| reindexer.mgr.release(r); |
| } |
| refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs); |
| } |
| |
| if (i > 0 && random().nextInt(10) == 7) { |
| // Random delete: |
| reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i))); |
| } |
| |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: commit @ " + (i+1) + " docs"); |
| reindexer.commit(); |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| |
| // Sometimes close & reopen writer/manager, to confirm the parallel segments persist: |
| if (random().nextInt(commitCloseNumDocs) == 17) { |
| if (DEBUG) System.out.println("TEST: close writer @ " + (i+1) + " docs"); |
| reindexer.close(); |
| reindexer = null; |
| commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs); |
| } |
| } |
| if (reindexer != null) { |
| reindexer.close(); |
| } |
| } |
| |
| private static void checkAllNumberDVs(IndexReader r) throws IOException { |
| checkAllNumberDVs(r, "number", true, 1); |
| } |
| |
| private static void checkAllNumberDVs(IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException { |
| NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName); |
| int maxDoc = r.maxDoc(); |
| boolean failed = false; |
| long t0 = System.currentTimeMillis(); |
| for(int i=0;i<maxDoc;i++) { |
| Document oldDoc = r.document(i); |
| long value = multiplier * Long.parseLong(oldDoc.get("text").split(" ")[1]); |
| assertEquals(i, numbers.nextDoc()); |
| if (value != numbers.longValue()) { |
| System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue() + " numbers=" + numbers); |
| failed = true; |
| } else if (failed) { |
| System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.longValue()); |
| } |
| } |
| if (failed) { |
| if (r instanceof LeafReader == false) { |
| System.out.println("TEST FAILED; check leaves"); |
| for(LeafReaderContext ctx : r.leaves()) { |
| System.out.println("CHECK LEAF=" + ctx.reader()); |
| checkAllNumberDVs(ctx.reader(), fieldName, false, 1); |
| } |
| } |
| if (doThrow) { |
| assertFalse("FAILED field=" + fieldName + " r=" + r, failed); |
| } else { |
| System.out.println("FAILED field=" + fieldName + " r=" + r); |
| } |
| } |
| } |
| |
| private static void testNumericDVSort(IndexSearcher s) throws IOException { |
| // Confirm we can sort by the new DV field: |
| TopDocs hits = s.search(new MatchAllDocsQuery(), 100, new Sort(new SortField("number", SortField.Type.LONG))); |
| long last = Long.MIN_VALUE; |
| for(ScoreDoc scoreDoc : hits.scoreDocs) { |
| long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]); |
| assertTrue(value >= last); |
| assertEquals(value, ((Long) ((FieldDoc) scoreDoc).fields[0]).longValue()); |
| last = value; |
| } |
| } |
| |
| private static void testPointRangeQuery(IndexSearcher s) throws IOException { |
| for(int i=0;i<100;i++) { |
| // Confirm we can range search by the new indexed (numeric) field: |
| long min = random().nextLong(); |
| long max = random().nextLong(); |
| if (min > max) { |
| long x = min; |
| min = max; |
| max = x; |
| } |
| |
| TopDocs hits = s.search(LongPoint.newRangeQuery("number", min, max), 100); |
| for(ScoreDoc scoreDoc : hits.scoreDocs) { |
| long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]); |
| assertTrue(value >= min); |
| assertTrue(value <= max); |
| } |
| |
| Arrays.sort(hits.scoreDocs, |
| new Comparator<ScoreDoc>() { |
| @Override |
| public int compare(ScoreDoc a, ScoreDoc b) { |
| return a.doc - b.doc; |
| } |
| }); |
| |
| NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number"); |
| for(ScoreDoc hit : hits.scoreDocs) { |
| if (numbers.docID() < hit.doc) { |
| numbers.advance(hit.doc); |
| } |
| assertEquals(hit.doc, numbers.docID()); |
| long value = Long.parseLong(s.doc(hit.doc).get("text").split(" ")[1]); |
| assertEquals(value, numbers.longValue()); |
| } |
| } |
| } |
| |
| // TODO: maybe the leading id could be further restricted? It's from StringHelper.idToString: |
| static final Pattern SEG_GEN_SUB_DIR_PATTERN = Pattern.compile("^[a-z0-9]+_([0-9]+)$"); |
| |
| private static List<Path> segSubDirs(Path segsPath) throws IOException { |
| List<Path> result = new ArrayList<>(); |
| try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) { |
| for (Path path : stream) { |
| // Must be form <segIDString>_<longGen> |
| if (Files.isDirectory(path) && SEG_GEN_SUB_DIR_PATTERN.matcher(path.getFileName().toString()).matches()) { |
| result.add(path); |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| // TODO: test exceptions |
| } |